Skip to content

Commit 923d31c

Browse files
committed
Gold-plating async tests.
1 parent 1fca1ba commit 923d31c

9 files changed

+63
-42
lines changed

projects/Test/AsyncIntegration/AsyncIntegrationFixture.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System.Collections.Generic;
3233
using System.Threading.Tasks;
3334
using RabbitMQ.Client;
3435
using Xunit;
@@ -53,6 +54,18 @@ protected override void SetUp()
5354
// InitializeAsync
5455
}
5556

57+
protected static Task AssertRanToCompletion(params Task[] tasks)
58+
{
59+
return AssertRanToCompletion(tasks);
60+
}
61+
62+
protected static async Task AssertRanToCompletion(IEnumerable<Task> tasks)
63+
{
64+
Task whenAllTask = Task.WhenAll(tasks);
65+
await whenAllTask;
66+
Assert.Equal(TaskStatus.RanToCompletion, whenAllTask.Status);
67+
}
68+
5669
public virtual async Task InitializeAsync()
5770
{
5871
_connFactory = CreateConnectionFactory();

projects/Test/AsyncIntegration/TestAsyncConsumer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ public class TestAsyncConsumer : AsyncIntegrationFixture
4343
{
4444
private readonly ShutdownEventArgs _closeArgs = new ShutdownEventArgs(ShutdownInitiator.Application, Constants.ReplySuccess, "normal shutdown");
4545

46-
public TestAsyncConsumer(ITestOutputHelper output) : base(output, true, 2)
46+
public TestAsyncConsumer(ITestOutputHelper output)
47+
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 2)
4748
{
4849
}
4950

@@ -113,7 +114,7 @@ public async Task TestBasicRoundtripConcurrent()
113114
await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer);
114115

115116
// ensure we get a delivery
116-
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
117+
await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task);
117118

118119
bool result1 = await publish1SyncSource.Task;
119120
Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
@@ -216,7 +217,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
216217
await _channel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer);
217218

218219
// ensure we get a delivery
219-
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
220+
await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task);
220221

221222
bool result1 = await publish1SyncSource.Task;
222223
Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
@@ -226,7 +227,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
226227
}
227228
});
228229

229-
await Task.WhenAll(publishTask, consumeTask);
230+
await AssertRanToCompletion(publishTask, consumeTask);
230231
}
231232

232233
[Fact]

projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,53 +43,54 @@ public class TestAsyncConsumerExceptions : AsyncIntegrationFixture
4343
{
4444
private static readonly Exception TestException = new Exception("oops");
4545

46-
public TestAsyncConsumerExceptions(ITestOutputHelper output) : base(output, true, 1)
46+
public TestAsyncConsumerExceptions(ITestOutputHelper output)
47+
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 1)
4748
{
4849
}
4950

5051
[Fact]
51-
public async Task TestCancelNotificationExceptionHandling()
52+
public Task TestCancelNotificationExceptionHandling()
5253
{
5354
IBasicConsumer consumer = new ConsumerFailingOnCancel(_channel);
54-
await TestExceptionHandlingWith(consumer, async (ch, q, c, ct) =>
55+
return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) =>
5556
{
5657
await ch.QueueDeleteAsync(q, false, false);
5758
});
5859
}
5960

6061
[Fact]
61-
public async Task TestConsumerCancelOkExceptionHandling()
62+
public Task TestConsumerCancelOkExceptionHandling()
6263
{
6364
IBasicConsumer consumer = new ConsumerFailingOnCancelOk(_channel);
64-
await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct));
65+
return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.BasicCancelAsync(ct));
6566
}
6667

6768
[Fact]
68-
public async Task TestConsumerConsumeOkExceptionHandling()
69+
public Task TestConsumerConsumeOkExceptionHandling()
6970
{
7071
IBasicConsumer consumer = new ConsumerFailingOnConsumeOk(_channel);
71-
await TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield());
72+
return TestExceptionHandlingWith(consumer, async (ch, q, c, ct) => await Task.Yield());
7273
}
7374

7475
[Fact]
75-
public async Task TestConsumerShutdownExceptionHandling()
76+
public Task TestConsumerShutdownExceptionHandling()
7677
{
7778
IBasicConsumer consumer = new ConsumerFailingOnShutdown(_channel);
78-
await TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync());
79+
return TestExceptionHandlingWith(consumer, (ch, q, c, ct) => ch.CloseAsync());
7980
}
8081

8182
[Fact]
82-
public async Task TestDeliveryExceptionHandling()
83+
public Task TestDeliveryExceptionHandling()
8384
{
8485
IBasicConsumer consumer = new ConsumerFailingOnDelivery(_channel);
85-
await TestExceptionHandlingWith(consumer, (ch, q, c, ct) =>
86+
return TestExceptionHandlingWith(consumer, (ch, q, c, ct) =>
8687
ch.BasicPublishAsync("", q, _encoding.GetBytes("msg")));
8788
}
8889

8990
protected async Task TestExceptionHandlingWith(IBasicConsumer consumer,
9091
Func<IChannel, string, IBasicConsumer, string, ValueTask> action)
9192
{
92-
var waitSpan = TimeSpan.FromSeconds(2);
93+
var waitSpan = TimeSpan.FromSeconds(5);
9394
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
9495
var cts = new CancellationTokenSource(waitSpan);
9596
cts.Token.Register(() => tcs.TrySetResult(false));

projects/Test/AsyncIntegration/TestBasicGetAsync.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public async Task TestBasicGet()
5757

5858
QueueDeclareOk queueResultPassive = await _channel.QueueDeclareAsync(queueName, true, true, true, true, null);
5959
Assert.Equal((uint)0, queueResultPassive.MessageCount);
60+
61+
Assert.Null(await _channel.BasicGetAsync(queueName, true));
6062
}
6163
}
6264
}

projects/Test/AsyncIntegration/TestBasicPublishAsync.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
// Copyright (c) 2011-2020 VMware, Inc. or its affiliates. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32-
using System;
3332
using System.Threading.Tasks;
3433
using RabbitMQ.Client;
3534
using Xunit;
@@ -53,22 +52,20 @@ public async Task TestQueuePurgeAsync()
5352
await _channel.ConfirmSelectAsync();
5453

5554
QueueDeclareOk q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
56-
string queueName = q.QueueName;
5755

5856
var publishTask = Task.Run(async () =>
5957
{
58+
byte[] body = GetRandomBody(512);
6059
for (int i = 0; i < messageCount; i++)
6160
{
62-
byte[] body = _encoding.GetBytes(Guid.NewGuid().ToString());
63-
await _channel.BasicPublishAsync(string.Empty, queueName, body);
61+
await _channel.BasicPublishAsync(string.Empty, q, body);
6462
}
63+
await _channel.WaitForConfirmsOrDieAsync();
6564
publishSyncSource.SetResult(true);
6665
});
6766

68-
await _channel.WaitForConfirmsOrDieAsync();
6967
Assert.True(await publishSyncSource.Task);
70-
71-
Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(queueName));
68+
Assert.Equal((uint)messageCount, await _channel.QueuePurgeAsync(q));
7269
}
7370
}
7471
}

projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -151,9 +151,7 @@ private async Task TestConcurrentChannelOperationsAsync(Func<IConnection, Task>
151151
tasks.Add(action(_conn));
152152
}
153153
}
154-
Task t = Task.WhenAll(tasks);
155-
await t;
156-
Assert.Equal(TaskStatus.RanToCompletion, t.Status);
154+
await AssertRanToCompletion(tasks);
157155

158156
// incorrect frame interleaving in these tests will result
159157
// in an unrecoverable connection-level exception, thus

projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ namespace Test.AsyncIntegration
3838
{
3939
public class TestConfirmSelectAsync : AsyncIntegrationFixture
4040
{
41+
readonly byte[] _message = GetRandomBody(64);
42+
4143
public TestConfirmSelectAsync(ITestOutputHelper output) : base(output)
4244
{
4345
}
@@ -63,7 +65,7 @@ public async Task TestConfirmSelectIdempotency()
6365

6466
private ValueTask Publish()
6567
{
66-
return _channel.BasicPublishAsync("", "amq.fanout", _encoding.GetBytes("message"));
68+
return _channel.BasicPublishAsync("", "amq.fanout", _message);
6769
}
6870
}
6971
}

projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public TestExchangeDeclareAsync(ITestOutputHelper output) : base(output)
5050
public async Task TestConcurrentExchangeDeclareAndBindAsync()
5151
{
5252
var exchangeNames = new ConcurrentBag<string>();
53-
var ts = new List<Task>();
53+
var tasks = new List<Task>();
5454
NotSupportedException nse = null;
5555
for (int i = 0; i < 256; i++)
5656
{
@@ -70,12 +70,12 @@ async Task f()
7070
}
7171
}
7272
var t = Task.Run(f);
73-
ts.Add(t);
73+
tasks.Add(t);
7474
}
7575

76-
await Task.WhenAll(ts);
76+
await AssertRanToCompletion(tasks);
7777
Assert.Null(nse);
78-
ts.Clear();
78+
tasks.Clear();
7979

8080
foreach (string exchangeName in exchangeNames)
8181
{
@@ -93,11 +93,18 @@ async Task f()
9393
}
9494
}
9595
var t = Task.Run(f);
96-
ts.Add(t);
96+
tasks.Add(t);
9797
}
9898

99-
await Task.WhenAll(ts);
99+
await AssertRanToCompletion(tasks);
100100
Assert.Null(nse);
101+
102+
async Task AssertRanToCompletion(IEnumerable<Task> tasks)
103+
{
104+
Task whenAllTask = Task.WhenAll(tasks);
105+
await whenAllTask;
106+
Assert.Equal(TaskStatus.RanToCompletion, whenAllTask.Status);
107+
}
101108
}
102109
}
103110
}

projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,8 @@ public async void TestConcurrentQueueDeclareAndBindAsync()
8484
});
8585
};
8686

87-
var ts = new List<Task>();
88-
var qs = new ConcurrentBag<string>();
87+
var tasks = new List<Task>();
88+
var queues = new ConcurrentBag<string>();
8989

9090
NotSupportedException nse = null;
9191
for (int i = 0; i < 256; i++)
@@ -100,23 +100,23 @@ async Task f()
100100
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty, passive: false, false, false, false, null);
101101
string queueName = r.QueueName;
102102
await _channel.QueueBindAsync(queue: queueName, exchange: "amq.fanout", routingKey: queueName, null);
103-
qs.Add(queueName);
103+
queues.Add(queueName);
104104
}
105105
catch (NotSupportedException e)
106106
{
107107
nse = e;
108108
}
109109
}
110110
var t = Task.Run(f);
111-
ts.Add(t);
111+
tasks.Add(t);
112112
}
113113

114-
await Task.WhenAll(ts);
114+
await AssertRanToCompletion(tasks);
115115
Assert.Null(nse);
116-
ts.Clear();
116+
tasks.Clear();
117117

118118
nse = null;
119-
foreach (string q in qs)
119+
foreach (string q in queues)
120120
{
121121
async Task f()
122122
{
@@ -139,10 +139,10 @@ async Task f()
139139
}
140140
}
141141
var t = Task.Run(f);
142-
ts.Add(t);
142+
tasks.Add(t);
143143
}
144144

145-
await Task.WhenAll(ts);
145+
await AssertRanToCompletion(tasks);
146146
Assert.Null(nse);
147147
Assert.False(sawShutdown);
148148
}

0 commit comments

Comments
 (0)