Skip to content

Commit 57f50b0

Browse files
michaelklishinlukebakken
authored andcommitted
Merge pull request #775 from danielmarbach/fix-async-consumer-loop
Fix AsyncConsumerWorkService (cherry picked from commit a11303e)
1 parent 6ed209e commit 57f50b0

File tree

1 file changed

+50
-24
lines changed

1 file changed

+50
-24
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
using RabbitMQ.Client.Impl;
2-
3-
using System.Collections.Concurrent;
1+
using System.Collections.Concurrent;
2+
using System.Reflection;
43
using System.Threading;
54
using System.Threading.Tasks;
65

6+
using RabbitMQ.Client.Impl;
7+
78
namespace RabbitMQ.Client
89
{
9-
internal class AsyncConsumerWorkService : ConsumerWorkService
10+
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1011
{
11-
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
12+
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
1213

1314
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
1415
{
15-
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
16+
_workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
1617
}
1718

1819
private WorkPool StartNewWorkPool(IModel model)
@@ -24,69 +25,94 @@ private WorkPool StartNewWorkPool(IModel model)
2425

2526
public void Stop(IModel model)
2627
{
27-
if (workPools.TryRemove(model, out WorkPool workPool))
28+
if (_workPools.TryRemove(model, out WorkPool workPool))
2829
{
2930
workPool.Stop();
3031
}
3132
}
3233

3334
public void Stop()
3435
{
35-
foreach (IModel model in workPools.Keys)
36+
foreach (IModel model in _workPools.Keys)
3637
{
3738
Stop(model);
3839
}
3940
}
4041

4142
class WorkPool
4243
{
43-
readonly ConcurrentQueue<Work> workQueue;
44-
readonly CancellationTokenSource tokenSource;
45-
readonly ModelBase model;
46-
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
47-
private Task task;
44+
readonly ConcurrentQueue<Work> _workQueue;
45+
readonly CancellationTokenSource _tokenSource;
46+
readonly ModelBase _model;
47+
readonly CancellationTokenRegistration _tokenRegistration;
48+
volatile TaskCompletionSource<bool> _syncSource = TaskCompletionSourceFactory.Create<bool>();
49+
private Task _worker;
4850

4951
public WorkPool(ModelBase model)
5052
{
51-
this.model = model;
52-
workQueue = new ConcurrentQueue<Work>();
53-
tokenSource = new CancellationTokenSource();
53+
_model = model;
54+
_workQueue = new ConcurrentQueue<Work>();
55+
_tokenSource = new CancellationTokenSource();
56+
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
5457
}
5558

5659
public void Start()
5760
{
58-
task = Task.Run(Loop, CancellationToken.None);
61+
_worker = Task.Run(Loop, CancellationToken.None);
5962
}
6063

6164
public void Enqueue(Work work)
6265
{
63-
workQueue.Enqueue(work);
64-
semaphore.Release();
66+
_workQueue.Enqueue(work);
67+
_syncSource.TrySetResult(true);
6568
}
6669

6770
async Task Loop()
6871
{
69-
while (tokenSource.IsCancellationRequested == false)
72+
while (_tokenSource.IsCancellationRequested == false)
7073
{
7174
try
7275
{
73-
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
76+
await _syncSource.Task.ConfigureAwait(false);
77+
_syncSource = TaskCompletionSourceFactory.Create<bool>();
7478
}
7579
catch (TaskCanceledException)
7680
{
7781
// Swallowing the task cancellation in case we are stopping work.
7882
}
7983

80-
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
84+
while (_tokenSource.IsCancellationRequested == false && _workQueue.TryDequeue(out Work work))
8185
{
82-
await work.Execute(model).ConfigureAwait(false);
86+
await work.Execute(_model).ConfigureAwait(false);
8387
}
8488
}
8589
}
8690

8791
public void Stop()
8892
{
89-
tokenSource.Cancel();
93+
_tokenSource.Cancel();
94+
_tokenRegistration.Dispose();
95+
}
96+
97+
static class TaskCompletionSourceFactory
98+
{
99+
#if NETFRAMEWORK
100+
static readonly FieldInfo StateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
101+
#endif
102+
103+
public static TaskCompletionSource<T> Create<T>(TaskCreationOptions options = TaskCreationOptions.None)
104+
{
105+
#if NETFRAMEWORK
106+
//This lovely hack forces the task scheduler to run continuations asynchronously,
107+
//see https://stackoverflow.com/questions/22579206/how-can-i-prevent-synchronous-continuations-on-a-task/22588431#22588431
108+
var tcs = new TaskCompletionSource<T>(options);
109+
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
110+
StateField.SetValue(tcs.Task, (int) StateField.GetValue(tcs.Task) | TASK_STATE_THREAD_WAS_ABORTED);
111+
return tcs;
112+
#else
113+
return new TaskCompletionSource<T>(options | TaskCreationOptions.RunContinuationsAsynchronously);
114+
#endif
115+
}
90116
}
91117
}
92118
}

0 commit comments

Comments
 (0)