Skip to content

Commit 1896a62

Browse files
authored
Merge pull request #1672 from rabbitmq/lukebakken/flaky-tests
Try to address some test flakes
2 parents 735bbca + 2729596 commit 1896a62

File tree

4 files changed

+28
-26
lines changed

4 files changed

+28
-26
lines changed

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ await WithTemporaryNonExclusiveQueueAsync(m, async (_, q) =>
8080
await m.QueueBindAsync(q, x, rk);
8181
await m.BasicPublishAsync(x, rk, _messageBody);
8282

83-
Assert.True(await TestConnectionRecoveryBase.WaitForConfirmsWithCancellationAsync(m));
83+
Assert.True(await WaitForConfirmsWithCancellationAsync(m));
8484
await m.ExchangeDeclarePassiveAsync(x);
8585
});
8686
}
@@ -234,11 +234,11 @@ protected static TaskCompletionSource<bool> PrepareForShutdown(IConnection conn)
234234
return tcs;
235235
}
236236

237-
protected static Task<bool> WaitForConfirmsWithCancellationAsync(IChannel m)
237+
protected static Task<bool> WaitForConfirmsWithCancellationAsync(IChannel channel)
238238
{
239239
using (var cts = new CancellationTokenSource(TimeSpan.FromSeconds(4)))
240240
{
241-
return m.WaitForConfirmsAsync(cts.Token);
241+
return channel.WaitForConfirmsAsync(cts.Token);
242242
}
243243
}
244244

projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,6 @@ await TestConcurrentOperationsAsync(async () =>
128128

129129
Assert.True(await tcs.Task);
130130
}, Iterations);
131-
132-
_output.WriteLine("@@@@@@@@ PUBLISH ACK COUNT: {0}", publishAckCount);
133131
}
134132
}
135133
}

projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,12 @@ public async Task TestCreateChannelOnClosedAutorecoveringConnectionDoesNotHang()
270270
[Fact]
271271
public async Task TestTopologyRecoveryConsumerFilter()
272272
{
273+
const string exchange = "topology.recovery.exchange";
274+
const string queueWithRecoveredConsumer = "topology.recovery.queue.1";
275+
const string queueWithIgnoredConsumer = "topology.recovery.queue.2";
276+
const string binding1 = "recovered.binding.1";
277+
const string binding2 = "recovered.binding.2";
278+
273279
var filter = new TopologyRecoveryFilter
274280
{
275281
ConsumerFilter = consumer => !consumer.ConsumerTag.Contains("filtered")
@@ -280,17 +286,13 @@ public async Task TestTopologyRecoveryConsumerFilter()
280286
using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter))
281287
{
282288
conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true);
289+
conn.ConnectionRecoveryError += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception);
290+
conn.CallbackException += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception);
283291

284292
using (IChannel ch = await conn.CreateChannelAsync())
285293
{
286294
await ch.ConfirmSelectAsync();
287295

288-
string exchange = "topology.recovery.exchange";
289-
string queueWithRecoveredConsumer = "topology.recovery.queue.1";
290-
string queueWithIgnoredConsumer = "topology.recovery.queue.2";
291-
string binding1 = "recovered.binding.1";
292-
string binding2 = "recovered.binding.2";
293-
294296
await ch.ExchangeDeclareAsync(exchange, "direct");
295297
await ch.QueueDeclareAsync(queueWithRecoveredConsumer, false, false, false);
296298
await ch.QueueDeclareAsync(queueWithIgnoredConsumer, false, false, false);
@@ -325,6 +327,7 @@ public async Task TestTopologyRecoveryConsumerFilter()
325327
Assert.True(ch.IsOpen);
326328
await ch.BasicPublishAsync(exchange, binding1, _encoding.GetBytes("test message"));
327329
await ch.BasicPublishAsync(exchange, binding2, _encoding.GetBytes("test message"));
330+
await WaitForConfirmsWithCancellationAsync(ch);
328331

329332
await consumerRecoveryTcs.Task.WaitAsync(TimeSpan.FromSeconds(5));
330333
Assert.True(await consumerRecoveryTcs.Task);

projects/Test/Integration/TestConnectionTopologyRecovery.cs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -228,21 +228,24 @@ public async Task TestTopologyRecoveryBindingFilter()
228228
[Fact]
229229
public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
230230
{
231+
const string exchange = "topology.recovery.exchange";
232+
const string queue1 = "topology.recovery.queue.1";
233+
const string queue2 = "topology.recovery.queue.2";
234+
const string binding1 = "recovered.binding";
235+
const string binding2 = "filtered.binding";
236+
231237
var connectionRecoveryTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
232238
var filter = new TopologyRecoveryFilter();
233239
AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryFilterAsync(filter);
234240
conn.RecoverySucceeded += (source, ea) => connectionRecoveryTcs.SetResult(true);
241+
conn.ConnectionRecoveryError += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception);
242+
conn.CallbackException += (source, ea) => connectionRecoveryTcs.SetException(ea.Exception);
243+
235244
IChannel ch = await conn.CreateChannelAsync();
236245
try
237246
{
238247
await ch.ConfirmSelectAsync();
239248

240-
string exchange = "topology.recovery.exchange";
241-
string queue1 = "topology.recovery.queue.1";
242-
string queue2 = "topology.recovery.queue.2";
243-
string binding1 = "recovered.binding";
244-
string binding2 = "filtered.binding";
245-
246249
await ch.ExchangeDeclareAsync(exchange, "direct");
247250
await ch.QueueDeclareAsync(queue1, false, false, false);
248251
await ch.QueueDeclareAsync(queue2, false, false, false);
@@ -281,16 +284,14 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
281284
await ch.QueueDeclarePassiveAsync(queue1);
282285
await ch.QueueDeclarePassiveAsync(queue2);
283286

284-
await ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message"));
285-
// await ch.WaitForConfirmsOrDieAsync();
286-
287-
await ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message"));
288-
// await ch.WaitForConfirmsOrDieAsync();
287+
var pt1 = ch.BasicPublishAsync(exchange, binding1, true, _encoding.GetBytes("test message"));
288+
var pt2 = ch.BasicPublishAsync(exchange, binding2, true, _encoding.GetBytes("test message"));
289+
await WaitForConfirmsWithCancellationAsync(ch);
290+
await Task.WhenAll(pt1.AsTask(), pt2.AsTask()).WaitAsync(WaitSpan);
289291

290-
await consumerReceivedTcs1.Task.WaitAsync(TimeSpan.FromSeconds(5));
291-
await consumerReceivedTcs2.Task.WaitAsync(TimeSpan.FromSeconds(5));
292-
Assert.True(consumerReceivedTcs1.Task.IsCompletedSuccessfully());
293-
Assert.True(consumerReceivedTcs2.Task.IsCompletedSuccessfully());
292+
await Task.WhenAll(consumerReceivedTcs1.Task, consumerReceivedTcs2.Task).WaitAsync(WaitSpan);
293+
Assert.True(await consumerReceivedTcs1.Task);
294+
Assert.True(await consumerReceivedTcs2.Task);
294295
}
295296
finally
296297
{

0 commit comments

Comments
 (0)