Skip to content

Commit 3847a76

Browse files
Stefán J. Sigurðarsonlukebakken
authored andcommitted
Removing the use of a BlockingCollection and replacing with ConcurrentQueue + SemaphoreSlim to get rid of a blocking wait for better concurrency.
1 parent 8778edf commit 3847a76

File tree

1 file changed

+31
-18
lines changed

1 file changed

+31
-18
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -476,10 +476,8 @@ private void Init(IFrameHandler fh)
476476
{
477477
if (ShouldTriggerConnectionRecovery(args))
478478
{
479-
if (!_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.BeginAutomaticRecovery))
480-
{
481-
ESLog.Warn("Failed to notify RecoveryLoop to BeginAutomaticRecovery.");
482-
}
479+
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.BeginAutomaticRecovery);
480+
_semaphore.Release();
483481
}
484482
};
485483
lock (_eventLock)
@@ -963,7 +961,8 @@ private enum RecoveryConnectionState
963961
private Task _recoveryTask;
964962
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;
965963

966-
private readonly BlockingCollection<RecoveryCommand> _recoveryLoopCommandQueue = new BlockingCollection<RecoveryCommand>();
964+
private readonly ConcurrentQueue<RecoveryCommand> _recoveryLoopCommandQueue = new ConcurrentQueue<RecoveryCommand>();
965+
readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
967966
private readonly CancellationTokenSource _recoveryCancellationToken = new CancellationTokenSource();
968967
private readonly TaskCompletionSource<int> _recoveryLoopComplete = new TaskCompletionSource<int>();
969968

@@ -974,19 +973,31 @@ private async Task MainRecoveryLoop()
974973
{
975974
try
976975
{
977-
while (_recoveryLoopCommandQueue.TryTake(out RecoveryCommand command, -1, _recoveryCancellationToken.Token))
976+
while (!_recoveryCancellationToken.IsCancellationRequested)
978977
{
979-
switch (_recoveryLoopState)
978+
try
979+
{
980+
await _semaphore.WaitAsync(_recoveryCancellationToken.Token).ConfigureAwait(false);
981+
}
982+
catch (TaskCanceledException)
980983
{
981-
case RecoveryConnectionState.Connected:
982-
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false);
983-
break;
984-
case RecoveryConnectionState.Recovering:
985-
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false);
986-
break;
987-
default:
988-
ESLog.Warn("RecoveryLoop state is out of range.");
989-
break;
984+
// Swallowing the task cancellation in case we are stopping work.
985+
}
986+
987+
if (!_recoveryCancellationToken.IsCancellationRequested && _recoveryLoopCommandQueue.TryDequeue(out RecoveryCommand command))
988+
{
989+
switch (_recoveryLoopState)
990+
{
991+
case RecoveryConnectionState.Connected:
992+
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false);
993+
break;
994+
case RecoveryConnectionState.Recovering:
995+
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false);
996+
break;
997+
default:
998+
ESLog.Warn("RecoveryLoop state is out of range.");
999+
break;
1000+
}
9901001
}
9911002
}
9921003
}
@@ -1034,7 +1045,8 @@ private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
10341045
else
10351046
{
10361047
await Task.Delay(_factory.NetworkRecoveryInterval);
1037-
_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
1048+
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
1049+
_semaphore.Release();
10381050
}
10391051

10401052
break;
@@ -1058,7 +1070,8 @@ private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
10581070
case RecoveryCommand.BeginAutomaticRecovery:
10591071
_recoveryLoopState = RecoveryConnectionState.Recovering;
10601072
await Task.Delay(_factory.NetworkRecoveryInterval).ConfigureAwait(false);
1061-
_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
1073+
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
1074+
_semaphore.Release();
10621075
break;
10631076
default:
10641077
ESLog.Warn($"RecoveryLoop command {command} is out of range.");

0 commit comments

Comments
 (0)