Skip to content

Commit 05d0930

Browse files
authored
Merge pull request #866 from danielmarbach/concurrent-dispatch
Ability to do concurrent dispatches both on the async as well as the sync consumer
2 parents f088565 + 1cee677 commit 05d0930

File tree

7 files changed

+270
-12
lines changed

7 files changed

+270
-12
lines changed

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,16 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF
175175
/// </summary>
176176
public bool DispatchConsumersAsync { get; set; } = false;
177177

178+
/// <summary>
179+
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IBasicConsumer"/>
180+
/// will be offloaded to the worker thread pool so it is important to choose the value for the concurrency wisely to avoid thread pool overloading.
181+
/// <see cref="IAsyncBasicConsumer"/> can handle concurrency much more efficiently due to the non-blocking nature of the consumer.
182+
/// Defaults to 1.
183+
/// </summary>
184+
/// <remarks>For concurrency greater than one this removes the guarantee that consumers handle messages in the order they receive them.
185+
/// In addition to that consumers need to be thread/concurrency safe.</remarks>
186+
public int ProcessingConcurrency { get; set; } = 1;
187+
178188
/// <summary>The host to connect to.</summary>
179189
public string HostName { get; set; } = "localhost";
180190

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,12 @@ namespace RabbitMQ.Client.Impl
99
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1010
{
1111
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
12-
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
12+
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc;
13+
14+
public AsyncConsumerWorkService(int concurrency) : base(concurrency)
15+
{
16+
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
17+
}
1318

1419
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
1520
{
@@ -22,9 +27,9 @@ public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
2227
workPool.Enqueue(work);
2328
}
2429

25-
private static WorkPool StartNewWorkPool(IModel model)
30+
private WorkPool StartNewWorkPool(IModel model)
2631
{
27-
var newWorkPool = new WorkPool(model as ModelBase);
32+
var newWorkPool = new WorkPool(model as ModelBase, _concurrency);
2833
newWorkPool.Start();
2934
return newWorkPool;
3035
}
@@ -44,16 +49,29 @@ class WorkPool
4449
readonly Channel<Work> _channel;
4550
readonly ModelBase _model;
4651
private Task _worker;
52+
private readonly int _concurrency;
53+
private SemaphoreSlim _limiter;
54+
private CancellationTokenSource _tokenSource;
4755

48-
public WorkPool(ModelBase model)
56+
public WorkPool(ModelBase model, int concurrency)
4957
{
58+
_concurrency = concurrency;
5059
_model = model;
5160
_channel = Channel.CreateUnbounded<Work>(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false });
5261
}
5362

5463
public void Start()
5564
{
56-
_worker = Task.Run(Loop, CancellationToken.None);
65+
if (_concurrency == 1)
66+
{
67+
_worker = Task.Run(Loop, CancellationToken.None);
68+
}
69+
else
70+
{
71+
_limiter = new SemaphoreSlim(_concurrency);
72+
_tokenSource = new CancellationTokenSource();
73+
_worker = Task.Run(() => LoopWithConcurrency(_tokenSource.Token), CancellationToken.None);
74+
}
5775
}
5876

5977
public void Enqueue(Work work)
@@ -83,9 +101,55 @@ async Task Loop()
83101
}
84102
}
85103

104+
async Task LoopWithConcurrency(CancellationToken cancellationToken)
105+
{
106+
try
107+
{
108+
while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
109+
{
110+
while (_channel.Reader.TryRead(out Work work))
111+
{
112+
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
113+
if(!_limiter.Wait(0))
114+
{
115+
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
116+
}
117+
118+
_ = HandleConcurrent(work, _model, _limiter);
119+
}
120+
}
121+
}
122+
catch (OperationCanceledException)
123+
{
124+
// ignored
125+
}
126+
}
127+
128+
static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim limiter)
129+
{
130+
try
131+
{
132+
Task task = work.Execute(model);
133+
if (!task.IsCompleted)
134+
{
135+
await task.ConfigureAwait(false);
136+
}
137+
}
138+
catch (Exception)
139+
{
140+
141+
}
142+
finally
143+
{
144+
limiter.Release();
145+
}
146+
}
147+
86148
public Task Stop()
87149
{
88150
_channel.Writer.Complete();
151+
_tokenSource?.Cancel();
152+
_limiter?.Dispose();
89153
return _worker;
90154
}
91155
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,14 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
110110
_factory = factory;
111111
_frameHandler = frameHandler;
112112

113+
int processingConcurrency = (factory as ConnectionFactory)?.ProcessingConcurrency ?? 1;
113114
if (factory is IAsyncConnectionFactory asyncConnectionFactory && asyncConnectionFactory.DispatchConsumersAsync)
114115
{
115-
ConsumerWorkService = new AsyncConsumerWorkService();
116+
ConsumerWorkService = new AsyncConsumerWorkService(processingConcurrency);
116117
}
117118
else
118119
{
119-
ConsumerWorkService = new ConsumerWorkService();
120+
ConsumerWorkService = new ConsumerWorkService(processingConcurrency);
120121
}
121122

122123
_sessionManager = new SessionManager(this, 0);

projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs

Lines changed: 68 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,15 @@ namespace RabbitMQ.Client.Impl
88
internal class ConsumerWorkService
99
{
1010
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
11-
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc = model => StartNewWorkPool(model);
11+
private readonly Func<IModel, WorkPool> _startNewWorkPoolFunc;
12+
protected readonly int _concurrency;
13+
14+
public ConsumerWorkService(int concurrency)
15+
{
16+
_concurrency = concurrency;
17+
18+
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
19+
}
1220

1321
public void AddWork(IModel model, Action fn)
1422
{
@@ -21,9 +29,9 @@ public void AddWork(IModel model, Action fn)
2129
workPool.Enqueue(fn);
2230
}
2331

24-
private static WorkPool StartNewWorkPool(IModel model)
32+
private WorkPool StartNewWorkPool(IModel model)
2533
{
26-
var newWorkPool = new WorkPool();
34+
var newWorkPool = new WorkPool(_concurrency);
2735
newWorkPool.Start();
2836
return newWorkPool;
2937
}
@@ -57,18 +65,29 @@ class WorkPool
5765
readonly CancellationTokenSource _tokenSource;
5866
readonly CancellationTokenRegistration _tokenRegistration;
5967
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
68+
private readonly int _concurrency;
6069
private Task _worker;
70+
private SemaphoreSlim _limiter;
6171

62-
public WorkPool()
72+
public WorkPool(int concurrency)
6373
{
74+
_concurrency = concurrency;
6475
_actions = new ConcurrentQueue<Action>();
6576
_tokenSource = new CancellationTokenSource();
6677
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
6778
}
6879

6980
public void Start()
7081
{
71-
_worker = Task.Run(Loop, CancellationToken.None);
82+
if (_concurrency == 1)
83+
{
84+
_worker = Task.Run(Loop, CancellationToken.None);
85+
}
86+
else
87+
{
88+
_limiter = new SemaphoreSlim(_concurrency);
89+
_worker = Task.Run(() => LoopWithConcurrency(_tokenSource.Token), CancellationToken.None);
90+
}
7291
}
7392

7493
public void Enqueue(Action action)
@@ -105,10 +124,54 @@ async Task Loop()
105124
}
106125
}
107126

127+
async Task LoopWithConcurrency(CancellationToken cancellationToken)
128+
{
129+
while (_tokenSource.IsCancellationRequested == false)
130+
{
131+
try
132+
{
133+
await _syncSource.Task.ConfigureAwait(false);
134+
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
135+
}
136+
catch (TaskCanceledException)
137+
{
138+
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
139+
}
140+
141+
while (_actions.TryDequeue(out Action action))
142+
{
143+
// Do a quick synchronous check before we resort to async/await with the state-machine overhead.
144+
if(!_limiter.Wait(0))
145+
{
146+
await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false);
147+
}
148+
149+
_ = OffloadToWorkerThreadPool(action, _limiter);
150+
}
151+
}
152+
}
153+
154+
static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter)
155+
{
156+
try
157+
{
158+
await Task.Run(() => action());
159+
}
160+
catch (Exception)
161+
{
162+
// ignored
163+
}
164+
finally
165+
{
166+
limiter.Release();
167+
}
168+
}
169+
108170
public Task Stop()
109171
{
110172
_tokenSource.Cancel();
111173
_tokenRegistration.Dispose();
174+
_limiter?.Dispose();
112175
return _worker;
113176
}
114177
}

projects/Unit/APIApproval.Approve.verified.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ namespace RabbitMQ.Client
8787
public System.TimeSpan NetworkRecoveryInterval { get; set; }
8888
public string Password { get; set; }
8989
public int Port { get; set; }
90+
public int ProcessingConcurrency { get; set; }
9091
public ushort RequestedChannelMax { get; set; }
9192
public System.TimeSpan RequestedConnectionTimeout { get; set; }
9293
public uint RequestedFrameMax { get; set; }

projects/Unit/TestAsyncConsumer.cs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Text;
4243
using System.Threading;
4344
using System.Threading.Tasks;
4445

@@ -81,6 +82,59 @@ public void TestBasicRoundtrip()
8182
}
8283
}
8384

85+
[Test]
86+
public async Task TestBasicRoundtripConcurrent()
87+
{
88+
var cf = new ConnectionFactory{ DispatchConsumersAsync = true, ProcessingConcurrency = 2 };
89+
using(IConnection c = cf.CreateConnection())
90+
using(IModel m = c.CreateModel())
91+
{
92+
QueueDeclareOk q = m.QueueDeclare();
93+
IBasicProperties bp = m.CreateBasicProperties();
94+
const string publish1 = "async-hi-1";
95+
var body = Encoding.UTF8.GetBytes(publish1);
96+
m.BasicPublish("", q.QueueName, bp, body);
97+
const string publish2 = "async-hi-2";
98+
body = Encoding.UTF8.GetBytes(publish2);
99+
m.BasicPublish("", q.QueueName, bp, body);
100+
101+
var consumer = new AsyncEventingBasicConsumer(m);
102+
103+
var publish1SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
104+
var publish2SyncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
105+
var maximumWaitTime = TimeSpan.FromSeconds(5);
106+
var tokenSource = new CancellationTokenSource(maximumWaitTime);
107+
tokenSource.Token.Register(() =>
108+
{
109+
publish1SyncSource.TrySetResult(false);
110+
publish2SyncSource.TrySetResult(false);
111+
});
112+
113+
consumer.Received += async (o, a) =>
114+
{
115+
switch (Encoding.UTF8.GetString(a.Body.ToArray()))
116+
{
117+
case publish1:
118+
publish1SyncSource.TrySetResult(true);
119+
await publish2SyncSource.Task;
120+
break;
121+
case publish2:
122+
publish2SyncSource.TrySetResult(true);
123+
await publish1SyncSource.Task;
124+
break;
125+
}
126+
};
127+
128+
m.BasicConsume(q.QueueName, true, consumer);
129+
// ensure we get a delivery
130+
131+
await Task.WhenAll(publish1SyncSource.Task, publish2SyncSource.Task);
132+
133+
Assert.IsTrue(publish1SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
134+
Assert.IsTrue(publish2SyncSource.Task.Result, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}");
135+
}
136+
}
137+
84138
[Test]
85139
public void TestBasicRoundtripNoWait()
86140
{

0 commit comments

Comments
 (0)