Skip to content

Commit 3b0ec95

Browse files
authored
Merge pull request #777 from rabbitmq/rabbitmq-dotnet-client-775-772-5.x
Back-port several master PRs to 5.x branch
2 parents 6ed209e + c6395dc commit 3b0ec95

File tree

3 files changed

+91
-40
lines changed

3 files changed

+91
-40
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AsyncConsumerWorkService.cs

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
1-
using RabbitMQ.Client.Impl;
2-
3-
using System.Collections.Concurrent;
1+
using System.Collections.Concurrent;
2+
using System.Reflection;
43
using System.Threading;
54
using System.Threading.Tasks;
65

6+
using RabbitMQ.Client.Impl;
7+
78
namespace RabbitMQ.Client
89
{
9-
internal class AsyncConsumerWorkService : ConsumerWorkService
10+
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
1011
{
11-
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
12+
private readonly ConcurrentDictionary<IModel, WorkPool> _workPools = new ConcurrentDictionary<IModel, WorkPool>();
1213

1314
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
1415
{
15-
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
16+
_workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
1617
}
1718

1819
private WorkPool StartNewWorkPool(IModel model)
@@ -24,69 +25,94 @@ private WorkPool StartNewWorkPool(IModel model)
2425

2526
public void Stop(IModel model)
2627
{
27-
if (workPools.TryRemove(model, out WorkPool workPool))
28+
if (_workPools.TryRemove(model, out WorkPool workPool))
2829
{
2930
workPool.Stop();
3031
}
3132
}
3233

3334
public void Stop()
3435
{
35-
foreach (IModel model in workPools.Keys)
36+
foreach (IModel model in _workPools.Keys)
3637
{
3738
Stop(model);
3839
}
3940
}
4041

4142
class WorkPool
4243
{
43-
readonly ConcurrentQueue<Work> workQueue;
44-
readonly CancellationTokenSource tokenSource;
45-
readonly ModelBase model;
46-
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
47-
private Task task;
44+
readonly ConcurrentQueue<Work> _workQueue;
45+
readonly CancellationTokenSource _tokenSource;
46+
readonly ModelBase _model;
47+
readonly CancellationTokenRegistration _tokenRegistration;
48+
volatile TaskCompletionSource<bool> _syncSource = TaskCompletionSourceFactory.Create<bool>();
49+
private Task _worker;
4850

4951
public WorkPool(ModelBase model)
5052
{
51-
this.model = model;
52-
workQueue = new ConcurrentQueue<Work>();
53-
tokenSource = new CancellationTokenSource();
53+
_model = model;
54+
_workQueue = new ConcurrentQueue<Work>();
55+
_tokenSource = new CancellationTokenSource();
56+
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
5457
}
5558

5659
public void Start()
5760
{
58-
task = Task.Run(Loop, CancellationToken.None);
61+
_worker = Task.Run(Loop, CancellationToken.None);
5962
}
6063

6164
public void Enqueue(Work work)
6265
{
63-
workQueue.Enqueue(work);
64-
semaphore.Release();
66+
_workQueue.Enqueue(work);
67+
_syncSource.TrySetResult(true);
6568
}
6669

6770
async Task Loop()
6871
{
69-
while (tokenSource.IsCancellationRequested == false)
72+
while (_tokenSource.IsCancellationRequested == false)
7073
{
7174
try
7275
{
73-
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
76+
await _syncSource.Task.ConfigureAwait(false);
77+
_syncSource = TaskCompletionSourceFactory.Create<bool>();
7478
}
7579
catch (TaskCanceledException)
7680
{
7781
// Swallowing the task cancellation in case we are stopping work.
7882
}
7983

80-
if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
84+
while (_tokenSource.IsCancellationRequested == false && _workQueue.TryDequeue(out Work work))
8185
{
82-
await work.Execute(model).ConfigureAwait(false);
86+
await work.Execute(_model).ConfigureAwait(false);
8387
}
8488
}
8589
}
8690

8791
public void Stop()
8892
{
89-
tokenSource.Cancel();
93+
_tokenSource.Cancel();
94+
_tokenRegistration.Dispose();
95+
}
96+
97+
static class TaskCompletionSourceFactory
98+
{
99+
#if NETFRAMEWORK
100+
static readonly FieldInfo StateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
101+
#endif
102+
103+
public static TaskCompletionSource<T> Create<T>(TaskCreationOptions options = TaskCreationOptions.None)
104+
{
105+
#if NETFRAMEWORK
106+
//This lovely hack forces the task scheduler to run continuations asynchronously,
107+
//see https://stackoverflow.com/questions/22579206/how-can-i-prevent-synchronous-continuations-on-a-task/22588431#22588431
108+
var tcs = new TaskCompletionSource<T>(options);
109+
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
110+
StateField.SetValue(tcs.Task, (int) StateField.GetValue(tcs.Task) | TASK_STATE_THREAD_WAS_ABORTED);
111+
return tcs;
112+
#else
113+
return new TaskCompletionSource<T>(options | TaskCreationOptions.RunContinuationsAsynchronously);
114+
#endif
115+
}
90116
}
91117
}
92118
}

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Concurrent;
3+
using System.Reflection;
34
using System.Threading;
45
using System.Threading.Tasks;
56

@@ -39,58 +40,83 @@ public void StopWork()
3940

4041
class WorkPool
4142
{
42-
readonly ConcurrentQueue<Action> actions;
43-
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
44-
readonly CancellationTokenSource tokenSource;
45-
private Task worker;
43+
readonly ConcurrentQueue<Action> _actions;
44+
readonly CancellationTokenSource _tokenSource;
45+
readonly CancellationTokenRegistration _tokenRegistration;
46+
volatile TaskCompletionSource<bool> _syncSource = TaskCompletionSourceFactory.Create<bool>();
47+
private Task _worker;
4648

4749
public WorkPool(IModel model)
4850
{
49-
actions = new ConcurrentQueue<Action>();
50-
tokenSource = new CancellationTokenSource();
51+
_actions = new ConcurrentQueue<Action>();
52+
_tokenSource = new CancellationTokenSource();
53+
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
5154
}
5255

5356
public void Start()
5457
{
55-
worker = Task.Run(Loop);
58+
_worker = Task.Run(Loop, CancellationToken.None);
5659
}
5760

5861
public void Enqueue(Action action)
5962
{
60-
actions.Enqueue(action);
61-
semaphore.Release();
63+
_actions.Enqueue(action);
64+
_syncSource.TrySetResult(true);
6265
}
6366

6467
async Task Loop()
6568
{
66-
while (tokenSource.IsCancellationRequested == false)
69+
while (_tokenSource.IsCancellationRequested == false)
6770
{
6871
try
6972
{
70-
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
73+
await _syncSource.Task.ConfigureAwait(false);
74+
_syncSource = TaskCompletionSourceFactory.Create<bool>();
7175
}
7276
catch (TaskCanceledException)
7377
{
74-
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
78+
// Swallowing the task cancellation in case we are stopping work.
7579
}
7680

77-
if (!tokenSource.IsCancellationRequested && actions.TryDequeue(out Action action))
81+
while (_tokenSource.IsCancellationRequested == false && _actions.TryDequeue(out Action action))
7882
{
7983
try
8084
{
8185
action();
8286
}
8387
catch (Exception)
8488
{
89+
// ignored
8590
}
8691
}
87-
8892
}
8993
}
9094

9195
public void Stop()
9296
{
93-
tokenSource.Cancel();
97+
_tokenSource.Cancel();
98+
_tokenRegistration.Dispose();
99+
}
100+
101+
static class TaskCompletionSourceFactory
102+
{
103+
#if NETFRAMEWORK
104+
static readonly FieldInfo StateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
105+
#endif
106+
107+
public static TaskCompletionSource<T> Create<T>(TaskCreationOptions options = TaskCreationOptions.None)
108+
{
109+
#if NETFRAMEWORK
110+
//This lovely hack forces the task scheduler to run continuations asynchronously,
111+
//see https://stackoverflow.com/questions/22579206/how-can-i-prevent-synchronous-continuations-on-a-task/22588431#22588431
112+
var tcs = new TaskCompletionSource<T>(options);
113+
const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;
114+
StateField.SetValue(tcs.Task, (int) StateField.GetValue(tcs.Task) | TASK_STATE_THREAD_WAS_ABORTED);
115+
return tcs;
116+
#else
117+
return new TaskCompletionSource<T>(options | TaskCreationOptions.RunContinuationsAsynchronously);
118+
#endif
119+
}
94120
}
95121
}
96122
}

projects/client/Unit/src/unit/APIApproval.Approve.approved.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3184,7 +3184,6 @@ namespace RabbitMQ.Client.Impl
31843184
void HandleBasicDeliver(RabbitMQ.Client.IBasicConsumer consumer, string consumerTag, ulong deliveryTag, bool redelivered, string exchange, string routingKey, RabbitMQ.Client.IBasicProperties basicProperties, byte[] body);
31853185
void HandleModelShutdown(RabbitMQ.Client.IBasicConsumer consumer, RabbitMQ.Client.ShutdownEventArgs reason);
31863186
void Quiesce();
3187-
void Shutdown();
31883187
void Shutdown(RabbitMQ.Client.IModel model);
31893188
}
31903189
public interface IFrameHandler

0 commit comments

Comments
 (0)