47
47
48
48
using RabbitMQ . Client . Events ;
49
49
using RabbitMQ . Client . Impl ;
50
+ using RabbitMQ . Util ;
50
51
51
52
namespace RabbitMQ . Client . Framing . Impl
52
53
{
@@ -477,7 +478,6 @@ private void Init(IFrameHandler fh)
477
478
if ( ShouldTriggerConnectionRecovery ( args ) )
478
479
{
479
480
_recoveryLoopCommandQueue . Enqueue ( RecoveryCommand . BeginAutomaticRecovery ) ;
480
- _semaphore . Release ( ) ;
481
481
}
482
482
} ;
483
483
lock ( _eventLock )
@@ -961,8 +961,7 @@ private enum RecoveryConnectionState
961
961
private Task _recoveryTask ;
962
962
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState . Connected ;
963
963
964
- private readonly ConcurrentQueue < RecoveryCommand > _recoveryLoopCommandQueue = new ConcurrentQueue < RecoveryCommand > ( ) ;
965
- private readonly SemaphoreSlim _semaphore = new SemaphoreSlim ( 0 ) ;
964
+ private readonly AsyncConcurrentQueue < RecoveryCommand > _recoveryLoopCommandQueue = new AsyncConcurrentQueue < RecoveryCommand > ( ) ;
966
965
private readonly CancellationTokenSource _recoveryCancellationToken = new CancellationTokenSource ( ) ;
967
966
private readonly TaskCompletionSource < int > _recoveryLoopComplete = new TaskCompletionSource < int > ( ) ;
968
967
@@ -975,29 +974,19 @@ private async Task MainRecoveryLoop()
975
974
{
976
975
while ( ! _recoveryCancellationToken . IsCancellationRequested )
977
976
{
978
- try
979
- {
980
- await _semaphore . WaitAsync ( _recoveryCancellationToken . Token ) . ConfigureAwait ( false ) ;
981
- }
982
- catch ( TaskCanceledException )
983
- {
984
- // Swallowing the task cancellation in case we are stopping work.
985
- }
986
-
987
- if ( ! _recoveryCancellationToken . IsCancellationRequested && _recoveryLoopCommandQueue . TryDequeue ( out RecoveryCommand command ) )
977
+ var command = await _recoveryLoopCommandQueue . DequeueAsync ( _recoveryCancellationToken . Token ) . ConfigureAwait ( false ) ;
978
+
979
+ switch ( _recoveryLoopState )
988
980
{
989
- switch ( _recoveryLoopState )
990
- {
991
- case RecoveryConnectionState . Connected :
992
- await RecoveryLoopConnectedHandler ( command ) . ConfigureAwait ( false ) ;
993
- break ;
994
- case RecoveryConnectionState . Recovering :
995
- await RecoveryLoopRecoveringHandler ( command ) . ConfigureAwait ( false ) ;
996
- break ;
997
- default :
998
- ESLog . Warn ( "RecoveryLoop state is out of range." ) ;
999
- break ;
1000
- }
981
+ case RecoveryConnectionState . Connected :
982
+ RecoveryLoopConnectedHandler ( command ) ;
983
+ break ;
984
+ case RecoveryConnectionState . Recovering :
985
+ RecoveryLoopRecoveringHandler ( command ) ;
986
+ break ;
987
+ default :
988
+ ESLog . Warn ( "RecoveryLoop state is out of range." ) ;
989
+ break ;
1001
990
}
1002
991
}
1003
992
}
@@ -1030,7 +1019,7 @@ private void StopRecoveryLoop()
1030
1019
/// Handles commands when in the Recovering state.
1031
1020
/// </summary>
1032
1021
/// <param name="command"></param>
1033
- private async Task RecoveryLoopRecoveringHandler ( RecoveryCommand command )
1022
+ private void RecoveryLoopRecoveringHandler ( RecoveryCommand command )
1034
1023
{
1035
1024
switch ( command )
1036
1025
{
@@ -1044,9 +1033,7 @@ private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
1044
1033
}
1045
1034
else
1046
1035
{
1047
- await Task . Delay ( _factory . NetworkRecoveryInterval ) ;
1048
- _recoveryLoopCommandQueue . Enqueue ( RecoveryCommand . PerformAutomaticRecovery ) ;
1049
- _semaphore . Release ( ) ;
1036
+ ScheduleRecoveryRetry ( ) ;
1050
1037
}
1051
1038
1052
1039
break ;
@@ -1060,7 +1047,7 @@ private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
1060
1047
/// Handles commands when in the Connected state.
1061
1048
/// </summary>
1062
1049
/// <param name="command"></param>
1063
- private async Task RecoveryLoopConnectedHandler ( RecoveryCommand command )
1050
+ private void RecoveryLoopConnectedHandler ( RecoveryCommand command )
1064
1051
{
1065
1052
switch ( command )
1066
1053
{
@@ -1069,14 +1056,24 @@ private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
1069
1056
break ;
1070
1057
case RecoveryCommand . BeginAutomaticRecovery :
1071
1058
_recoveryLoopState = RecoveryConnectionState . Recovering ;
1072
- await Task . Delay ( _factory . NetworkRecoveryInterval ) . ConfigureAwait ( false ) ;
1073
- _recoveryLoopCommandQueue . Enqueue ( RecoveryCommand . PerformAutomaticRecovery ) ;
1074
- _semaphore . Release ( ) ;
1059
+ ScheduleRecoveryRetry ( ) ;
1075
1060
break ;
1076
1061
default :
1077
1062
ESLog . Warn ( $ "RecoveryLoop command { command } is out of range.") ;
1078
1063
break ;
1079
1064
}
1080
1065
}
1066
+
1067
+ /// <summary>
1068
+ /// Schedule a background Task to signal the command queue when the retry duration has elapsed.
1069
+ /// </summary>
1070
+ private void ScheduleRecoveryRetry ( )
1071
+ {
1072
+ Task . Delay ( _factory . NetworkRecoveryInterval )
1073
+ . ContinueWith ( t =>
1074
+ {
1075
+ _recoveryLoopCommandQueue . Enqueue ( RecoveryCommand . PerformAutomaticRecovery ) ;
1076
+ } ) ;
1077
+ }
1081
1078
}
1082
1079
}
0 commit comments