Skip to content

Commit 162e193

Browse files
committed
Quick spike to see how to achieve concurrent dispatch
1 parent c93144a commit 162e193

File tree

6 files changed

+228
-12
lines changed

6 files changed

+228
-12
lines changed

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,16 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF
175175
/// </summary>
176176
public bool DispatchConsumersAsync { get; set; } = false;
177177

178+
/// <summary>
179+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
180+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
181+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
182+
/// Defaults to 1.
183+
/// </summary>
184+
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
185+
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
186+
public int ProcessingConcurrency { get; set; } = 1;
187+
178188
/// <summary>The host to connect to.</summary>
179189
public string HostName { get; set; } = "localhost";
180190

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ namespace RabbitMQ.Client.Impl
99
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1010
{
1111
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
12-
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
12+
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc;
13+
14+
public AsyncConsumerWorkService(int concurrency) : base(concurrency)
15+
{
16+
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
17+
}
1318

1419
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
1520
{
@@ -22,9 +27,9 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
2227
workPool.Enqueue(work);
2328
}
2429

25-
private static WorkPool StartNewWorkPool(IModel model)
30+
private WorkPool StartNewWorkPool(IModel model)
2631
{
27-
var newWorkPool = new WorkPool(model as ModelBase);
32+
var newWorkPool = new WorkPool(model as ModelBase, _concurrency);
2833
newWorkPool.Start();
2934
return newWorkPool;
3035
}
@@ -44,16 +49,29 @@ class WorkPool
4449
readonly Channel<Work> _channel;
4550
readonly ModelBase _model;
4651
private Task _worker;
52+
private readonly int _concurrency;
53+
private SemaphoreSlim _limiter;
54+
private CancellationTokenSource _tokenSource;
4755

48-
public WorkPool(ModelBase model)
56+
public WorkPool(ModelBase model, int concurrency)
4957
{
58+
_concurrency = concurrency;
5059
_model = model;
5160
_channel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
5261
}
5362

5463
public void Start()
5564
{
56-
_worker = Task.Run(Loop, CancellationToken.None);
65+
if (_concurrency == 1)
66+
{
67+
_worker = Task.Run(Loop, CancellationToken.None);
68+
}
69+
else
70+
{
71+
_limiter = new SemaphoreSlim(_concurrency);
72+
_tokenSource = new CancellationTokenSource();
73+
_worker = Task.Run(() => LoopWithConcurrency(_tokenSource.Token), CancellationToken.None);
74+
}
5775
}
5876

5977
public void Enqueue(Work work)
@@ -83,9 +101,55 @@ async Task Loop()
83101
}
84102
}
85103

104+
async Task LoopWithConcurrency(CancellationToken cancellationToken)
105+
{
106+
try
107+
{
108+
while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
109+
{
110+
while (_channel.Reader.TryRead(out Work work))
111+
{
112+
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
113+
if(!_limiter.Wait(0))
114+
{
115+
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
116+
}
117+
118+
_ = HandleConcurrent(work, _model, _limiter);
119+
}
120+
}
121+
}
122+
catch (OperationCanceledException)
123+
{
124+
// ignored
125+
}
126+
}
127+
128+
static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim limiter)
129+
{
130+
try
131+
{
132+
Task task = work.Execute(model);
133+
if (!task.IsCompleted)
134+
{
135+
await task.ConfigureAwait(false);
136+
}
137+
}
138+
catch (Exception)
139+
{
140+
141+
}
142+
finally
143+
{
144+
limiter.Release();
145+
}
146+
}
147+
86148
public Task Stop()
87149
{
88150
_channel.Writer.Complete();
151+
_tokenSource?.Cancel();
152+
_limiter?.Dispose();
89153
return _worker;
90154
}
91155
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,14 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
110110
_factory = factory;
111111
_frameHandler = frameHandler;
112112

113+
int processingConcurrency = (factory as ConnectionFactory)?.ProcessingConcurrency ?? 1;
113114
if (factory is IAsyncConnectionFactory asyncConnectionFactory && asyncConnectionFactory.DispatchConsumersAsync)
114115
{
115-
ConsumerWorkService = new AsyncConsumerWorkService();
116+
ConsumerWorkService = new AsyncConsumerWorkService(processingConcurrency);
116117
}
117118
else
118119
{
119-
ConsumerWorkService = new ConsumerWorkService();
120+
ConsumerWorkService = new ConsumerWorkService(processingConcurrency);
120121
}
121122

122123
_sessionManager = new SessionManager(this, 0);

projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@ namespace RabbitMQ.Client.Impl
88
internal class ConsumerWorkService
99
{
1010
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
11-
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
11+
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc;
12+
protected readonly int _concurrency;
13+
14+
public ConsumerWorkService(int concurrency)
15+
{
16+
_concurrency = concurrency;
17+
18+
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
19+
}
1220

1321
public void AddWork(IModel model, Action fn)
1422
{
@@ -21,9 +29,9 @@ public void AddWork(IModel model, Action fn)
2129
workPool.Enqueue(fn);
2230
}
2331

24-
private static WorkPool StartNewWorkPool(IModel model)
32+
private WorkPool StartNewWorkPool(IModel model)
2533
{
26-
var newWorkPool = new WorkPool();
34+
var newWorkPool = new WorkPool(_concurrency);
2735
newWorkPool.Start();
2836
return newWorkPool;
2937
}
@@ -57,18 +65,29 @@ class WorkPool
5765
readonly CancellationTokenSource _tokenSource;
5866
readonly CancellationTokenRegistration _tokenRegistration;
5967
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
68+
private readonly int _concurrency;
6069
private Task _worker;
70+
private SemaphoreSlim _limiter;
6171

62-
public WorkPool()
72+
public WorkPool(int concurrency)
6373
{
74+
_concurrency = concurrency;
6475
_actions = new ConcurrentQueue<Action>();
6576
_tokenSource = new CancellationTokenSource();
6677
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
6778
}
6879

6980
public void Start()
7081
{
71-
_worker = Task.Run(Loop, CancellationToken.None);
82+
if (_concurrency == 1)
83+
{
84+
_worker = Task.Run(Loop, CancellationToken.None);
85+
}
86+
else
87+
{
88+
_limiter = new SemaphoreSlim(_concurrency);
89+
_worker = Task.Run(() => LoopWithConcurrency(_tokenSource.Token), CancellationToken.None);
90+
}
7291
}
7392

7493
public void Enqueue(Action action)
@@ -105,10 +124,54 @@ async Task Loop()
105124
}
106125
}
107126

127+
async Task LoopWithConcurrency(CancellationToken cancellationToken)
128+
{
129+
while (_tokenSource.IsCancellationRequested == false)
130+
{
131+
try
132+
{
133+
await _syncSource.Task.ConfigureAwait(false);
134+
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
135+
}
136+
catch (TaskCanceledException)
137+
{
138+
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
139+
}
140+
141+
while (_actions.TryDequeue(out Action action))
142+
{
143+
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
144+
if(!_limiter.Wait(0))
145+
{
146+
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
147+
}
148+
149+
_ = OffloadToWorkerThreadPool(action, _limiter);
150+
}
151+
}
152+
}
153+
154+
static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter)
155+
{
156+
try
157+
{
158+
await Task.Run(() => action());
159+
}
160+
catch (Exception)
161+
{
162+
// ignored
163+
}
164+
finally
165+
{
166+
limiter.Release();
167+
}
168+
}
169+
108170
public Task Stop()
109171
{
110172
_tokenSource.Cancel();
111173
_tokenRegistration.Dispose();
174+
_limiter?.Dispose();
112175
return _worker;
113176
}
114177
}

projects/Unit/TestAsyncConsumer.cs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,40 @@ public void TestBasicRoundtrip()
8181
}
8282
}
8383

84+
[Test]
85+
public void TestBasicRoundtripConcurrent()
86+
{
87+
var cf = new ConnectionFactory{ DispatchConsumersAsync = true, ProcessingConcurrency = 2 };
88+
using(IConnection c = cf.CreateConnection())
89+
using(IModel m = c.CreateModel())
90+
{
91+
QueueDeclareOk q = m.QueueDeclare();
92+
IBasicProperties bp = m.CreateBasicProperties();
93+
var body = System.Text.Encoding.UTF8.GetBytes("async-hi-1");
94+
m.BasicPublish("", q.QueueName, bp, body);
95+
body = System.Text.Encoding.UTF8.GetBytes("async-hi-2");
96+
m.BasicPublish("", q.QueueName, bp, body);
97+
var consumer = new AsyncEventingBasicConsumer(m);
98+
var are = new CountdownEvent(2);
99+
consumer.Received += async (o, a) =>
100+
{
101+
await Task.Delay(500);
102+
are.Signal();
103+
};
104+
string tag = m.BasicConsume(q.QueueName, true, consumer);
105+
// ensure we get a delivery
106+
bool waitRes = are.Wait(800);
107+
Assert.IsTrue(waitRes);
108+
109+
are.Reset(1);
110+
// unsubscribe and ensure no further deliveries
111+
m.BasicCancel(tag);
112+
m.BasicPublish("", q.QueueName, bp, body);
113+
bool waitResFalse = are.Wait(2000);
114+
Assert.IsFalse(waitResFalse);
115+
}
116+
}
117+
84118
[Test]
85119
public void TestBasicRoundtripNoWait()
86120
{

projects/Unit/TestConsumer.cs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
using System.Threading;
2+
using NUnit.Framework;
3+
using RabbitMQ.Client.Events;
4+
5+
namespace RabbitMQ.Client.Unit
6+
{
7+
[TestFixture]
8+
public class TestConsumer
9+
{
10+
[Test]
11+
public void TestBasicRoundtripConcurrent()
12+
{
13+
var cf = new ConnectionFactory{ ProcessingConcurrency = 2 };
14+
using(IConnection c = cf.CreateConnection())
15+
using(IModel m = c.CreateModel())
16+
{
17+
QueueDeclareOk q = m.QueueDeclare();
18+
IBasicProperties bp = m.CreateBasicProperties();
19+
var body = System.Text.Encoding.UTF8.GetBytes("async-hi-1");
20+
m.BasicPublish("", q.QueueName, bp, body);
21+
body = System.Text.Encoding.UTF8.GetBytes("async-hi-2");
22+
m.BasicPublish("", q.QueueName, bp, body);
23+
var consumer = new EventingBasicConsumer(m);
24+
var are = new CountdownEvent(2);
25+
consumer.Received += (o, a) =>
26+
{
27+
Thread.Sleep(500);
28+
are.Signal();
29+
};
30+
string tag = m.BasicConsume(q.QueueName, true, consumer);
31+
// ensure we get a delivery
32+
bool waitRes = are.Wait(800);
33+
Assert.IsTrue(waitRes);
34+
35+
are.Reset(1);
36+
// unsubscribe and ensure no further deliveries
37+
m.BasicCancel(tag);
38+
m.BasicPublish("", q.QueueName, bp, body);
39+
bool waitResFalse = are.Wait(2000);
40+
Assert.IsFalse(waitResFalse);
41+
}
42+
}
43+
}
44+
}

0 commit comments

Comments
 (0)