Skip to content

Commit 4f837c4

Browse files
committed
Concurrent dispatch as opt-in
1 parent fbbbeba commit 4f837c4

File tree

2 files changed

+73
-32
lines changed

2 files changed

+73
-32
lines changed

projects/Unit/TestAsyncConsumer.cs

Lines changed: 35 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Text;
4243
using System.Threading;
4344
using System.Threading.Tasks;
4445

@@ -82,36 +83,55 @@ public void TestBasicRoundtrip()
8283
}
8384

8485
[Test]
85-
public void TestBasicRoundtripConcurrent()
86+
public async Task TestBasicRoundtripConcurrent()
8687
{
8788
var cf = new ConnectionFactory{ DispatchConsumersAsync = true, ProcessingConcurrency = 2 };
8889
using(IConnection c = cf.CreateConnection())
8990
using(IModel m = c.CreateModel())
9091
{
9192
QueueDeclareOk q = m.QueueDeclare();
9293
IBasicProperties bp = m.CreateBasicProperties();
93-
var body = System.Text.Encoding.UTF8.GetBytes("async-hi-1");
94+
const string publish1 = "async-hi-1";
95+
var body = Encoding.UTF8.GetBytes(publish1);
9496
m.BasicPublish("", q.QueueName, bp, body);
95-
body = System.Text.Encoding.UTF8.GetBytes("async-hi-2");
97+
const string publish2 = "async-hi-2";
98+
body = Encoding.UTF8.GetBytes(publish2);
9699
m.BasicPublish("", q.QueueName, bp, body);
100+
97101
var consumer = new AsyncEventingBasicConsumer(m);
98-
var are = new CountdownEvent(2);
102+
103+
var publish1SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
104+
var publish2SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
105+
var maximumWaitTime = TimeSpan.FromSeconds(5);
106+
var tokenSource = new CancellationTokenSource(maximumWaitTime);
107+
tokenSource.Token.Register(() =>
108+
{
109+
publish1SyncSource.TrySetResult(false);
110+
publish2SyncSource.TrySetResult(false);
111+
});
112+
99113
consumer.Received += async (o, a) =>
100114
{
101-
await Task.Delay(500);
102-
are.Signal();
115+
switch (Encoding.UTF8.GetString(a.Body.ToArray()))
116+
{
117+
case publish1:
118+
publish1SyncSource.TrySetResult(true);
119+
await publish2SyncSource.Task;
120+
break;
121+
case publish2:
122+
publish2SyncSource.TrySetResult(true);
123+
await publish1SyncSource.Task;
124+
break;
125+
}
103126
};
104-
string tag = m.BasicConsume(q.QueueName, true, consumer);
127+
128+
m.BasicConsume(q.QueueName, true, consumer);
105129
// ensure we get a delivery
106-
bool waitRes = are.Wait(800);
107-
Assert.IsTrue(waitRes);
108130

109-
are.Reset(1);
110-
// unsubscribe and ensure no further deliveries
111-
m.BasicCancel(tag);
112-
m.BasicPublish("", q.QueueName, bp, body);
113-
bool waitResFalse = are.Wait(2000);
114-
Assert.IsFalse(waitResFalse);
131+
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
132+
133+
Assert.IsTrue(publish1SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
134+
Assert.IsTrue(publish2SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
115135
}
116136
}
117137

projects/Unit/TestConsumer.cs

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
using System;
2+
using System.Text;
13
using System.Threading;
4+
using System.Threading.Tasks;
25
using NUnit.Framework;
36
using RabbitMQ.Client.Events;
47

@@ -8,36 +11,54 @@ namespace RabbitMQ.Client.Unit
811
public class TestConsumer
912
{
1013
[Test]
11-
public void TestBasicRoundtripConcurrent()
14+
public async Task TestBasicRoundtripConcurrent()
1215
{
13-
var cf = new ConnectionFactory{ ProcessingConcurrency = 2 };
16+
var cf = new ConnectionFactory{ ProcessingConcurrency = 1 };
1417
using(IConnection c = cf.CreateConnection())
1518
using(IModel m = c.CreateModel())
1619
{
1720
QueueDeclareOk q = m.QueueDeclare();
1821
IBasicProperties bp = m.CreateBasicProperties();
19-
var body = System.Text.Encoding.UTF8.GetBytes("async-hi-1");
22+
const string publish1 = "sync-hi-1";
23+
var body = Encoding.UTF8.GetBytes(publish1);
2024
m.BasicPublish("", q.QueueName, bp, body);
21-
body = System.Text.Encoding.UTF8.GetBytes("async-hi-2");
25+
const string publish2 = "sync-hi-2";
26+
body = Encoding.UTF8.GetBytes(publish2);
2227
m.BasicPublish("", q.QueueName, bp, body);
28+
2329
var consumer = new EventingBasicConsumer(m);
24-
var are = new CountdownEvent(2);
30+
31+
var publish1SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
32+
var publish2SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
33+
var maximumWaitTime = TimeSpan.FromSeconds(5);
34+
var tokenSource = new CancellationTokenSource(maximumWaitTime);
35+
tokenSource.Token.Register(() =>
36+
{
37+
publish1SyncSource.TrySetResult(false);
38+
publish2SyncSource.TrySetResult(false);
39+
});
40+
2541
consumer.Received += (o, a) =>
2642
{
27-
Thread.Sleep(500);
28-
are.Signal();
43+
switch (Encoding.UTF8.GetString(a.Body.ToArray()))
44+
{
45+
case publish1:
46+
publish1SyncSource.TrySetResult(true);
47+
publish2SyncSource.Task.GetAwaiter().GetResult();
48+
break;
49+
case publish2:
50+
publish2SyncSource.TrySetResult(true);
51+
publish1SyncSource.Task.GetAwaiter().GetResult();
52+
break;
53+
}
2954
};
30-
string tag = m.BasicConsume(q.QueueName, true, consumer);
31-
// ensure we get a delivery
32-
bool waitRes = are.Wait(800);
33-
Assert.IsTrue(waitRes);
3455

35-
are.Reset(1);
36-
// unsubscribe and ensure no further deliveries
37-
m.BasicCancel(tag);
38-
m.BasicPublish("", q.QueueName, bp, body);
39-
bool waitResFalse = are.Wait(2000);
40-
Assert.IsFalse(waitResFalse);
56+
m.BasicConsume(q.QueueName, true, consumer);
57+
58+
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
59+
60+
Assert.IsTrue(publish1SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
61+
Assert.IsTrue(publish2SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
4162
}
4263
}
4364
}

0 commit comments

Comments
 (0)