Skip to content

Commit 09e7f5c

Browse files
authored
Merge pull request #1173 from rabbitmq/rabbitmq-dotnet-client-1168
Add test to try and repro #1168
2 parents 605dc08 + 623812c commit 09e7f5c

File tree

2 files changed

+217
-95
lines changed

2 files changed

+217
-95
lines changed

projects/Unit/TestAsyncConsumer.cs

Lines changed: 217 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -30,99 +30,206 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Security.Cryptography;
3334
using System.Text;
3435
using System.Threading;
3536
using System.Threading.Tasks;
3637

3738
using RabbitMQ.Client.Events;
3839
using Xunit;
40+
using Xunit.Abstractions;
3941

4042
namespace RabbitMQ.Client.Unit
4143
{
42-
4344
public class TestAsyncConsumer
4445
{
46+
private readonly ITestOutputHelper _output;
47+
48+
public TestAsyncConsumer(ITestOutputHelper output)
49+
{
50+
_output = output;
51+
}
52+
4553
[Fact]
4654
public void TestBasicRoundtrip()
4755
{
4856
var cf = new ConnectionFactory{ DispatchConsumersAsync = true };
4957
using(IConnection c = cf.CreateConnection())
50-
using(IModel m = c.CreateModel())
5158
{
52-
QueueDeclareOk q = m.QueueDeclare();
53-
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
54-
m.BasicPublish("", q.QueueName, body);
55-
var consumer = new AsyncEventingBasicConsumer(m);
56-
var are = new AutoResetEvent(false);
57-
consumer.Received += async (o, a) =>
58-
{
59-
are.Set();
60-
await Task.Yield();
61-
};
62-
string tag = m.BasicConsume(q.QueueName, true, consumer);
63-
// ensure we get a delivery
64-
bool waitRes = are.WaitOne(2000);
65-
Assert.True(waitRes);
66-
// unsubscribe and ensure no further deliveries
67-
m.BasicCancel(tag);
68-
m.BasicPublish("", q.QueueName, body);
69-
bool waitResFalse = are.WaitOne(2000);
70-
Assert.False(waitResFalse);
59+
using(IModel m = c.CreateModel())
60+
{
61+
QueueDeclareOk q = m.QueueDeclare();
62+
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
63+
m.BasicPublish("", q.QueueName, body);
64+
var consumer = new AsyncEventingBasicConsumer(m);
65+
var are = new AutoResetEvent(false);
66+
consumer.Received += async (o, a) =>
67+
{
68+
are.Set();
69+
await Task.Yield();
70+
};
71+
string tag = m.BasicConsume(q.QueueName, true, consumer);
72+
// ensure we get a delivery
73+
bool waitRes = are.WaitOne(2000);
74+
Assert.True(waitRes);
75+
// unsubscribe and ensure no further deliveries
76+
m.BasicCancel(tag);
77+
m.BasicPublish("", q.QueueName, body);
78+
bool waitResFalse = are.WaitOne(2000);
79+
Assert.False(waitResFalse);
80+
}
7181
}
7282
}
7383

7484
[Fact]
7585
public async Task TestBasicRoundtripConcurrent()
7686
{
7787
var cf = new ConnectionFactory{ DispatchConsumersAsync = true, ConsumerDispatchConcurrency = 2 };
78-
using(IConnection c = cf.CreateConnection())
79-
using(IModel m = c.CreateModel())
88+
using (IConnection c = cf.CreateConnection())
8089
{
81-
QueueDeclareOk q = m.QueueDeclare();
82-
const string publish1 = "async-hi-1";
83-
byte[] body = Encoding.UTF8.GetBytes(publish1);
84-
m.BasicPublish("", q.QueueName, body);
85-
const string publish2 = "async-hi-2";
86-
body = Encoding.UTF8.GetBytes(publish2);
87-
m.BasicPublish("", q.QueueName, body);
88-
89-
var consumer = new AsyncEventingBasicConsumer(m);
90-
91-
var publish1SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
92-
var publish2SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
93-
var maximumWaitTime = TimeSpan.FromSeconds(5);
94-
var tokenSource = new CancellationTokenSource(maximumWaitTime);
95-
tokenSource.Token.Register(() =>
90+
using (IModel m = c.CreateModel())
9691
{
97-
publish1SyncSource.TrySetResult(false);
98-
publish2SyncSource.TrySetResult(false);
99-
});
92+
QueueDeclareOk q = m.QueueDeclare();
93+
string publish1 = get_unique_string(16384);
94+
byte[] body = Encoding.UTF8.GetBytes(publish1);
95+
m.BasicPublish("", q.QueueName, body);
10096

101-
consumer.Received += async (o, a) =>
102-
{
103-
switch (Encoding.UTF8.GetString(a.Body.ToArray()))
97+
string publish2 = get_unique_string(16384);
98+
body = Encoding.UTF8.GetBytes(publish2);
99+
m.BasicPublish("", q.QueueName, body);
100+
101+
var consumer = new AsyncEventingBasicConsumer(m);
102+
103+
var publish1SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
104+
var publish2SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
105+
var maximumWaitTime = TimeSpan.FromSeconds(5);
106+
var tokenSource = new CancellationTokenSource(maximumWaitTime);
107+
tokenSource.Token.Register(() =>
104108
{
105-
case publish1:
109+
publish1SyncSource.TrySetResult(false);
110+
publish2SyncSource.TrySetResult(false);
111+
});
112+
113+
consumer.Received += async (o, a) =>
114+
{
115+
string decoded = Encoding.ASCII.GetString(a.Body.ToArray());
116+
if (decoded == publish1)
117+
{
106118
publish1SyncSource.TrySetResult(true);
107119
await publish2SyncSource.Task;
108-
break;
109-
case publish2:
120+
}
121+
else if (decoded == publish2)
122+
{
110123
publish2SyncSource.TrySetResult(true);
111124
await publish1SyncSource.Task;
112-
break;
113-
}
114-
};
125+
}
126+
};
115127

116-
m.BasicConsume(q.QueueName, true, consumer);
117-
// ensure we get a delivery
128+
m.BasicConsume(q.QueueName, true, consumer);
129+
// ensure we get a delivery
118130

119-
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
131+
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
120132

121-
Assert.True(publish1SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
122-
Assert.True(publish2SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
133+
Assert.True(publish1SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
134+
Assert.True(publish2SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
135+
}
123136
}
124137
}
125138

139+
[Fact]
140+
public async Task TestBasicRoundtripConcurrentManyMessages()
141+
{
142+
const int publish_total = 4096;
143+
string queueName = $"{nameof(TestBasicRoundtripConcurrentManyMessages)}-{Guid.NewGuid()}";
144+
145+
string publish1 = get_unique_string(32768);
146+
byte[] body1 = Encoding.ASCII.GetBytes(publish1);
147+
string publish2 = get_unique_string(32768);
148+
byte[] body2 = Encoding.ASCII.GetBytes(publish2);
149+
150+
var cf = new ConnectionFactory{ DispatchConsumersAsync = true, ConsumerDispatchConcurrency = 2 };
151+
152+
using (IConnection c = cf.CreateConnection())
153+
{
154+
using (IModel m = c.CreateModel())
155+
{
156+
QueueDeclareOk q = m.QueueDeclare(queue: queueName, exclusive: false, durable: true);
157+
Assert.Equal(q.QueueName, queueName);
158+
}
159+
}
160+
161+
Task publishTask = Task.Run(() =>
162+
{
163+
using (IConnection c = cf.CreateConnection())
164+
{
165+
using (IModel m = c.CreateModel())
166+
{
167+
QueueDeclareOk q = m.QueueDeclare(queue: queueName, exclusive: false, durable: true);
168+
for (int i = 0; i < publish_total; i++)
169+
{
170+
m.BasicPublish(string.Empty, queueName, body1);
171+
m.BasicPublish(string.Empty, queueName, body2);
172+
}
173+
}
174+
}
175+
});
176+
177+
Task consumeTask = Task.Run(() =>
178+
{
179+
var publish1SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
180+
var publish2SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
181+
var maximumWaitTime = TimeSpan.FromSeconds(10);
182+
var tokenSource = new CancellationTokenSource(maximumWaitTime);
183+
tokenSource.Token.Register(() =>
184+
{
185+
publish1SyncSource.TrySetResult(false);
186+
publish2SyncSource.TrySetResult(false);
187+
});
188+
189+
using (IConnection c = cf.CreateConnection())
190+
{
191+
using (IModel m = c.CreateModel())
192+
{
193+
var consumer = new AsyncEventingBasicConsumer(m);
194+
195+
int publish1_count = 0;
196+
int publish2_count = 0;
197+
198+
consumer.Received += async (o, a) =>
199+
{
200+
string decoded = Encoding.ASCII.GetString(a.Body.ToArray());
201+
if (decoded == publish1)
202+
{
203+
if (Interlocked.Increment(ref publish1_count) >= publish_total)
204+
{
205+
publish1SyncSource.TrySetResult(true);
206+
await publish2SyncSource.Task;
207+
}
208+
}
209+
else if (decoded == publish2)
210+
{
211+
if (Interlocked.Increment(ref publish2_count) >= publish_total)
212+
{
213+
publish2SyncSource.TrySetResult(true);
214+
await publish1SyncSource.Task;
215+
}
216+
}
217+
};
218+
219+
m.BasicConsume(queueName, true, consumer);
220+
221+
// ensure we get a delivery
222+
Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
223+
224+
Assert.True(publish1SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
225+
Assert.True(publish2SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
226+
}
227+
}
228+
});
229+
230+
await Task.WhenAll(publishTask, consumeTask);
231+
}
232+
126233
[Fact]
127234
public void TestBasicRoundtripNoWait()
128235
{
@@ -164,47 +271,49 @@ public void ConcurrentEventingTestForReceived()
164271

165272
var cf = new ConnectionFactory{ DispatchConsumersAsync = true };
166273
using (IConnection c = cf.CreateConnection())
167-
using (IModel m = c.CreateModel())
168274
{
169-
QueueDeclareOk q = m.QueueDeclare();
170-
var consumer = new AsyncEventingBasicConsumer(m);
171-
m.BasicConsume(q.QueueName, true, consumer);
172-
var countdownEvent = new CountdownEvent(NumberOfThreads);
173-
var tasks = new Task[NumberOfThreads];
174-
for (int i = 0; i < NumberOfThreads; i++)
275+
using (IModel m = c.CreateModel())
175276
{
176-
int threadIndex = i;
177-
tasks[i] = Task.Run(() =>
277+
QueueDeclareOk q = m.QueueDeclare();
278+
var consumer = new AsyncEventingBasicConsumer(m);
279+
m.BasicConsume(q.QueueName, true, consumer);
280+
var countdownEvent = new CountdownEvent(NumberOfThreads);
281+
var tasks = new Task[NumberOfThreads];
282+
for (int i = 0; i < NumberOfThreads; i++)
178283
{
179-
countdownEvent.Signal();
180-
countdownEvent.Wait();
181-
int start = threadIndex * NumberOfRegistrations;
182-
for (int j = start; j < start + NumberOfRegistrations; j++)
284+
int threadIndex = i;
285+
tasks[i] = Task.Run(() =>
183286
{
184-
int receivedIndex = j;
185-
consumer.Received += (sender, eventArgs) =>
287+
countdownEvent.Signal();
288+
countdownEvent.Wait();
289+
int start = threadIndex * NumberOfRegistrations;
290+
for (int j = start; j < start + NumberOfRegistrations; j++)
186291
{
187-
called[receivedIndex] = 1;
188-
return Task.CompletedTask;
189-
};
190-
}
191-
});
192-
}
292+
int receivedIndex = j;
293+
consumer.Received += (sender, eventArgs) =>
294+
{
295+
called[receivedIndex] = 1;
296+
return Task.CompletedTask;
297+
};
298+
}
299+
});
300+
}
193301

194-
countdownEvent.Wait();
195-
Task.WaitAll(tasks);
302+
countdownEvent.Wait();
303+
Task.WaitAll(tasks);
196304

197-
// Add last receiver
198-
var are = new AutoResetEvent(false);
199-
consumer.Received += (o, a) =>
200-
{
201-
are.Set();
202-
return Task.CompletedTask;
203-
};
305+
// Add last receiver
306+
var are = new AutoResetEvent(false);
307+
consumer.Received += (o, a) =>
308+
{
309+
are.Set();
310+
return Task.CompletedTask;
311+
};
204312

205-
// Send message
206-
m.BasicPublish("", q.QueueName, ReadOnlyMemory<byte>.Empty);
207-
are.WaitOne(TimingFixture.TestTimeout);
313+
// Send message
314+
m.BasicPublish("", q.QueueName, ReadOnlyMemory<byte>.Empty);
315+
are.WaitOne(TimingFixture.TestTimeout);
316+
}
208317
}
209318

210319
// Check received messages
@@ -216,13 +325,27 @@ public void NonAsyncConsumerShouldThrowInvalidOperationException()
216325
{
217326
var cf = new ConnectionFactory{ DispatchConsumersAsync = true };
218327
using(IConnection c = cf.CreateConnection())
219-
using(IModel m = c.CreateModel())
220328
{
221-
QueueDeclareOk q = m.QueueDeclare();
222-
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
223-
m.BasicPublish("", q.QueueName, body);
224-
var consumer = new EventingBasicConsumer(m);
225-
Assert.Throws<InvalidOperationException>(() => m.BasicConsume(q.QueueName, false, consumer));
329+
using(IModel m = c.CreateModel())
330+
{
331+
QueueDeclareOk q = m.QueueDeclare();
332+
byte[] body = System.Text.Encoding.UTF8.GetBytes("async-hi");
333+
m.BasicPublish("", q.QueueName, body);
334+
var consumer = new EventingBasicConsumer(m);
335+
Assert.Throws<InvalidOperationException>(() => m.BasicConsume(q.QueueName, false, consumer));
336+
}
337+
}
338+
}
339+
340+
private string get_unique_string(int string_length)
341+
{
342+
using (var rng = RandomNumberGenerator.Create())
343+
{
344+
var bit_count = (string_length * 6);
345+
var byte_count = ((bit_count + 7) / 8); // rounded up
346+
var bytes = new byte[byte_count];
347+
rng.GetBytes(bytes);
348+
return Convert.ToBase64String(bytes);
226349
}
227350
}
228351
}

projects/Unit/TestEventingConsumer.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838

3939
namespace RabbitMQ.Client.Unit
4040
{
41-
4241
public class TestEventingConsumer : IntegrationFixture
4342
{
4443
public TestEventingConsumer(ITestOutputHelper output) : base(output)

0 commit comments

Comments
 (0)