Skip to content

Commit 9e1ce33

Browse files
danielmarbachlukebakken
authored andcommitted
Align ConsumerWorkService
1 parent a11303e commit 9e1ce33

File tree

1 file changed

+10
-5
lines changed

1 file changed

+10
-5
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,27 @@ public void StopWork()
4040
class WorkPool
4141
{
4242
readonly ConcurrentQueue<Action> _actions;
43-
readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
4443
readonly CancellationTokenSource _tokenSource;
44+
CancellationTokenRegistration _tokenRegistration;
45+
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
4546
private Task _worker;
4647

4748
public WorkPool()
4849
{
4950
_actions = new ConcurrentQueue<Action>();
5051
_tokenSource = new CancellationTokenSource();
52+
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
5153
}
5254

5355
public void Start()
5456
{
55-
_worker = Task.Run(Loop);
57+
_worker = Task.Run(Loop, CancellationToken.None);
5658
}
5759

5860
public void Enqueue(Action action)
5961
{
6062
_actions.Enqueue(action);
61-
_semaphore.Release();
63+
_syncSource.TrySetResult(true);
6264
}
6365

6466
async Task Loop()
@@ -67,21 +69,23 @@ async Task Loop()
6769
{
6870
try
6971
{
70-
await _semaphore.WaitAsync(_tokenSource.Token).ConfigureAwait(false);
72+
await _syncSource.Task.ConfigureAwait(false);
73+
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7174
}
7275
catch (TaskCanceledException)
7376
{
7477
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
7578
}
7679

77-
if (!_tokenSource.IsCancellationRequested && _actions.TryDequeue(out Action action))
80+
while (_tokenSource.IsCancellationRequested && _actions.TryDequeue(out Action action))
7881
{
7982
try
8083
{
8184
action();
8285
}
8386
catch (Exception)
8487
{
88+
// ignored
8589
}
8690
}
8791

@@ -91,6 +95,7 @@ async Task Loop()
9195
public void Stop()
9296
{
9397
_tokenSource.Cancel();
98+
_tokenRegistration.Dispose();
9499
}
95100
}
96101
}

0 commit comments

Comments
 (0)