Skip to content

Commit 1186f3f

Browse files
Merge branch 'michac-michac-autorec-loop-update-squash'
2 parents 8aed00f + a23d6b2 commit 1186f3f

File tree

2 files changed

+119
-33
lines changed

2 files changed

+119
-33
lines changed

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringConnection.cs

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
using RabbitMQ.Client.Events;
4949
using RabbitMQ.Client.Exceptions;
5050
using RabbitMQ.Client.Impl;
51+
52+
using RabbitMQ.Util;
5153
using RabbitMQ.Client.Logging;
5254

5355
namespace RabbitMQ.Client.Framing.Impl
@@ -479,7 +481,6 @@ private void Init(IFrameHandler fh)
479481
if (ShouldTriggerConnectionRecovery(args))
480482
{
481483
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.BeginAutomaticRecovery);
482-
_semaphore.Release();
483484
}
484485
};
485486
lock (_eventLock)
@@ -963,8 +964,7 @@ private enum RecoveryConnectionState
963964
private Task _recoveryTask;
964965
private RecoveryConnectionState _recoveryLoopState = RecoveryConnectionState.Connected;
965966

966-
private readonly ConcurrentQueue<RecoveryCommand> _recoveryLoopCommandQueue = new ConcurrentQueue<RecoveryCommand>();
967-
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
967+
private readonly AsyncConcurrentQueue<RecoveryCommand> _recoveryLoopCommandQueue = new AsyncConcurrentQueue<RecoveryCommand>();
968968
private readonly CancellationTokenSource _recoveryCancellationToken = new CancellationTokenSource();
969969
private readonly TaskCompletionSource<int> _recoveryLoopComplete = new TaskCompletionSource<int>();
970970

@@ -977,29 +977,19 @@ private async Task MainRecoveryLoop()
977977
{
978978
while (!_recoveryCancellationToken.IsCancellationRequested)
979979
{
980-
try
980+
var command = await _recoveryLoopCommandQueue.DequeueAsync(_recoveryCancellationToken.Token).ConfigureAwait(false);
981+
982+
switch (_recoveryLoopState)
981983
{
982-
await _semaphore.WaitAsync(_recoveryCancellationToken.Token).ConfigureAwait(false);
983-
}
984-
catch (TaskCanceledException)
985-
{
986-
// Swallowing the task cancellation in case we are stopping work.
987-
}
988-
989-
if (!_recoveryCancellationToken.IsCancellationRequested && _recoveryLoopCommandQueue.TryDequeue(out RecoveryCommand command))
990-
{
991-
switch (_recoveryLoopState)
992-
{
993-
case RecoveryConnectionState.Connected:
994-
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false);
995-
break;
996-
case RecoveryConnectionState.Recovering:
997-
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false);
998-
break;
999-
default:
1000-
ESLog.Warn("RecoveryLoop state is out of range.");
1001-
break;
1002-
}
984+
case RecoveryConnectionState.Connected:
985+
RecoveryLoopConnectedHandler(command);
986+
break;
987+
case RecoveryConnectionState.Recovering:
988+
RecoveryLoopRecoveringHandler(command);
989+
break;
990+
default:
991+
ESLog.Warn("RecoveryLoop state is out of range.");
992+
break;
1003993
}
1004994
}
1005995
}
@@ -1032,7 +1022,7 @@ private void StopRecoveryLoop()
10321022
/// Handles commands when in the Recovering state.
10331023
/// </summary>
10341024
/// <param name="command"></param>
1035-
private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
1025+
private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
10361026
{
10371027
switch (command)
10381028
{
@@ -1046,9 +1036,7 @@ private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
10461036
}
10471037
else
10481038
{
1049-
await Task.Delay(_factory.NetworkRecoveryInterval);
1050-
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
1051-
_semaphore.Release();
1039+
ScheduleRecoveryRetry();
10521040
}
10531041

10541042
break;
@@ -1062,7 +1050,7 @@ private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
10621050
/// Handles commands when in the Connected state.
10631051
/// </summary>
10641052
/// <param name="command"></param>
1065-
private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
1053+
private void RecoveryLoopConnectedHandler(RecoveryCommand command)
10661054
{
10671055
switch (command)
10681056
{
@@ -1071,14 +1059,24 @@ private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
10711059
break;
10721060
case RecoveryCommand.BeginAutomaticRecovery:
10731061
_recoveryLoopState = RecoveryConnectionState.Recovering;
1074-
await Task.Delay(_factory.NetworkRecoveryInterval).ConfigureAwait(false);
1075-
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
1076-
_semaphore.Release();
1062+
ScheduleRecoveryRetry();
10771063
break;
10781064
default:
10791065
ESLog.Warn($"RecoveryLoop command {command} is out of range.");
10801066
break;
10811067
}
10821068
}
1069+
1070+
/// <summary>
1071+
/// Schedule a background Task to signal the command queue when the retry duration has elapsed.
1072+
/// </summary>
1073+
private void ScheduleRecoveryRetry()
1074+
{
1075+
Task.Delay(_factory.NetworkRecoveryInterval)
1076+
.ContinueWith(t =>
1077+
{
1078+
_recoveryLoopCommandQueue.Enqueue(RecoveryCommand.PerformAutomaticRecovery);
1079+
});
1080+
}
10831081
}
10841082
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v1.1:
23+
//
24+
//---------------------------------------------------------------------------
25+
// The contents of this file are subject to the Mozilla Public License
26+
// Version 1.1 (the "License"); you may not use this file except in
27+
// compliance with the License. You may obtain a copy of the License
28+
// at https://www.mozilla.org/MPL/
29+
//
30+
// Software distributed under the License is distributed on an "AS IS"
31+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
32+
// the License for the specific language governing rights and
33+
// limitations under the License.
34+
//
35+
// The Original Code is RabbitMQ.
36+
//
37+
// The Initial Developer of the Original Code is Pivotal Software, Inc.
38+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
39+
//---------------------------------------------------------------------------
40+
41+
using System;
42+
using System.Collections.Concurrent;
43+
using System.Threading;
44+
using System.Threading.Tasks;
45+
46+
namespace RabbitMQ.Util
47+
{
48+
/// <summary>
49+
/// A concurrent queue where Dequeue waits for something to be added if the queue is empty and Enqueue signals
50+
/// that something has been added. Similar in function to a BlockingCollection but with async/await
51+
/// support.
52+
/// </summary>
53+
/// <typeparam name="T">
54+
/// Queue element type
55+
/// </typeparam>
56+
internal class AsyncConcurrentQueue<T>
57+
{
58+
private readonly ConcurrentQueue<T> _internalQueue = new ConcurrentQueue<T>();
59+
private readonly SemaphoreSlim _semaphore = new SemaphoreSlim(0);
60+
61+
/// <summary>
62+
/// Returns a Task that is completed when an object can be returned from the beginning of the queue.
63+
/// </summary>
64+
/// <param name="token"></param>
65+
/// <returns></returns>
66+
public async Task<T> DequeueAsync(CancellationToken token = default)
67+
{
68+
await _semaphore.WaitAsync(token).ConfigureAwait(false);
69+
70+
if (!_internalQueue.TryDequeue(out var command))
71+
{
72+
throw new InvalidOperationException("Internal queue is empty despite receiving an enqueueing signal");
73+
}
74+
75+
return command;
76+
}
77+
78+
/// <summary>
79+
/// Add an object to the end of the queue.
80+
/// </summary>
81+
/// <param name="item"></param>
82+
public void Enqueue(T item)
83+
{
84+
_internalQueue.Enqueue(item);
85+
_semaphore.Release();
86+
}
87+
}
88+
}

0 commit comments

Comments
 (0)