Skip to content

Commit 0edb5ed

Browse files
Merge pull request #772 from danielmarbach/fix-consumer-loop
Fix consumer loop
2 parents a11303e + 4bde9d2 commit 0edb5ed

File tree

1 file changed

+10
-6
lines changed

1 file changed

+10
-6
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ConsumerWorkService.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,25 +40,27 @@ public void StopWork()
4040
class WorkPool
4141
{
4242
readonly ConcurrentQueue<Action> _actions;
43-
readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
4443
readonly CancellationTokenSource _tokenSource;
44+
CancellationTokenRegistration _tokenRegistration;
45+
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
4546
private Task _worker;
4647

4748
public WorkPool()
4849
{
4950
_actions = new ConcurrentQueue<Action>();
5051
_tokenSource = new CancellationTokenSource();
52+
_tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled());
5153
}
5254

5355
public void Start()
5456
{
55-
_worker = Task.Run(Loop);
57+
_worker = Task.Run(Loop, CancellationToken.None);
5658
}
5759

5860
public void Enqueue(Action action)
5961
{
6062
_actions.Enqueue(action);
61-
_semaphore.Release();
63+
_syncSource.TrySetResult(true);
6264
}
6365

6466
async Task Loop()
@@ -67,30 +69,32 @@ async Task Loop()
6769
{
6870
try
6971
{
70-
await _semaphore.WaitAsync(_tokenSource.Token).ConfigureAwait(false);
72+
await _syncSource.Task.ConfigureAwait(false);
73+
_syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7174
}
7275
catch (TaskCanceledException)
7376
{
7477
// Swallowing the task cancellation exception for the semaphore in case we are stopping.
7578
}
7679

77-
if (!_tokenSource.IsCancellationRequested && _actions.TryDequeue(out Action action))
80+
while (_tokenSource.IsCancellationRequested == false && _actions.TryDequeue(out Action action))
7881
{
7982
try
8083
{
8184
action();
8285
}
8386
catch (Exception)
8487
{
88+
// ignored
8589
}
8690
}
87-
8891
}
8992
}
9093

9194
public void Stop()
9295
{
9396
_tokenSource.Cancel();
97+
_tokenRegistration.Dispose();
9498
}
9599
}
96100
}

0 commit comments

Comments
 (0)