Skip to content

Commit 04ee878

Browse files
committed
* Make reconnection logic a bit easier to follow.
* No need for a separate reconnection Task
1 parent 0d964b6 commit 04ee878

File tree

1 file changed

+84
-78
lines changed

1 file changed

+84
-78
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 84 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -307,18 +307,31 @@ await _semaphoreClose.WaitAsync()
307307
// close all the sessions, if the connection is closed the sessions are not valid anymore
308308
_nativePubSubSessions.ClearSessions();
309309

310-
if (error != null)
310+
void DoClose(Error? argError = null)
311+
{
312+
Error? err = argError ?? Utils.ConvertError(error);
313+
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
314+
OnNewStatus(State.Closed, err);
315+
ChangeEntitiesStatus(State.Closed, err);
316+
ConnectionCloseTaskCompletionSource.SetResult(true);
317+
}
318+
319+
if (error is null)
320+
{
321+
DoClose();
322+
return;
323+
}
324+
else
311325
{
312326
// we assume here that the connection is closed unexpectedly, since the error is not null
313327
Trace.WriteLine(TraceLevel.Warning, $"{ToString()} closed unexpectedly.");
314328

315329
// we have to check if the recovery is active.
316330
// The user may want to disable the recovery mechanism
317331
// the user can use the lifecycle callback to handle the error
318-
if (!_connectionSettings.Recovery.IsActivate())
332+
if (false == _connectionSettings.Recovery.IsActivate())
319333
{
320-
OnNewStatus(State.Closed, Utils.ConvertError(error));
321-
ChangeEntitiesStatus(State.Closed, Utils.ConvertError(error));
334+
DoClose();
322335
return;
323336
}
324337

@@ -327,99 +340,92 @@ await _semaphoreClose.WaitAsync()
327340
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
328341
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(error));
329342

330-
Task reconnectionTask = Task.Run(async () =>
343+
IBackOffDelayPolicy backOffDelayPolicy = _connectionSettings.Recovery.GetBackOffDelayPolicy();
344+
bool connected = false;
345+
// as first step we try to recover the connection
346+
// so the connected flag is false
347+
while (false == connected &&
348+
// we have to check if the backoff policy is active
349+
// the user may want to disable the backoff policy or
350+
// the backoff policy is not active due of some condition
351+
// for example: Reaching the maximum number of retries and avoid the forever loop
352+
backOffDelayPolicy.IsActive() &&
353+
354+
// even we set the status to reconnecting up, we need to check if the connection is still in the
355+
// reconnecting status. The user may close the connection in the meanwhile
356+
State == State.Reconnecting)
331357
{
332-
bool connected = false;
333-
// as first step we try to recover the connection
334-
// so the connected flag is false
335-
while (!connected &&
336-
// we have to check if the backoff policy is active
337-
// the user may want to disable the backoff policy or
338-
// the backoff policy is not active due of some condition
339-
// for example: Reaching the maximum number of retries and avoid the forever loop
340-
_connectionSettings.Recovery.GetBackOffDelayPolicy().IsActive() &&
341-
342-
// even we set the status to reconnecting up, we need to check if the connection is still in the
343-
// reconnecting status. The user may close the connection in the meanwhile
344-
State == State.Reconnecting)
345-
{
346-
try
347-
{
348-
int nextDelayMs = _connectionSettings.Recovery.GetBackOffDelayPolicy().Delay();
349-
350-
Trace.WriteLine(TraceLevel.Information,
351-
$"{ToString()} is trying Recovering connection in {nextDelayMs} milliseconds, " +
352-
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. ");
353-
354-
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
355-
.ConfigureAwait(false);
356-
357-
await OpenConnectionAsync()
358-
.ConfigureAwait(false);
359-
360-
connected = true;
361-
}
362-
catch (Exception e)
363-
{
364-
Trace.WriteLine(TraceLevel.Warning,
365-
$"{ToString()} Error trying to recover connection {e}");
366-
}
367-
}
368-
369-
_connectionSettings.Recovery.GetBackOffDelayPolicy().Reset();
370-
string connectionDescription = connected ? "recovered" : "not recovered";
371-
Trace.WriteLine(TraceLevel.Information,
372-
$"{ToString()} is {connectionDescription}");
373-
374-
if (!connected)
358+
try
375359
{
376-
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} connection is closed");
377-
OnNewStatus(State.Closed,
378-
new Error(ConnectionNotRecoveredCode,
379-
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
360+
int nextDelayMs = backOffDelayPolicy.Delay();
380361

381-
ChangeEntitiesStatus(State.Closed, new Error(ConnectionNotRecoveredCode,
382-
$"{ConnectionNotRecoveredMessage}, recover status: {_connectionSettings.Recovery}"));
362+
Trace.WriteLine(TraceLevel.Information,
363+
$"{ToString()} is trying Recovering connection in {nextDelayMs} milliseconds, " +
364+
$"attempt: {_connectionSettings.Recovery.GetBackOffDelayPolicy().CurrentAttempt}. ");
383365

384-
return;
385-
}
386-
387-
if (_connectionSettings.Recovery.IsTopologyActive())
388-
{
389-
Trace.WriteLine(TraceLevel.Information, $"{ToString()} Recovering topology");
390-
var visitor = new Visitor(_management);
391-
await _recordingTopologyListener.Accept(visitor)
366+
await Task.Delay(TimeSpan.FromMilliseconds(nextDelayMs))
392367
.ConfigureAwait(false);
393-
}
394368

395-
OnNewStatus(State.Open, null);
396-
// after the connection is recovered we have to reconnect all the publishers and consumers
369+
await OpenConnectionAsync()
370+
.ConfigureAwait(false);
397371

398-
try
399-
{
400-
await ReconnectEntitiesAsync().ConfigureAwait(false);
372+
connected = true;
401373
}
402374
catch (Exception e)
403375
{
404-
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
376+
// TODO this could / should be more visible to the user, perhaps?
377+
Trace.WriteLine(TraceLevel.Warning,
378+
$"{ToString()} Error trying to recover connection {e}");
405379
}
406-
});
380+
}
407381

408-
// TODO reconnection timeout?
409-
await reconnectionTask.ConfigureAwait(false);
382+
backOffDelayPolicy.Reset();
383+
string connectionDescription = connected ? "recovered" : "not recovered";
384+
Trace.WriteLine(TraceLevel.Information,
385+
$"{ToString()} is {connectionDescription}");
410386

411-
return;
412-
}
387+
if (false == connected)
388+
{
389+
var notRecoveredError = new Error(ConnectionNotRecoveredCode,
390+
$"{ConnectionNotRecoveredMessage}," +
391+
$"recover status: {_connectionSettings.Recovery}");
392+
DoClose(notRecoveredError);
393+
return;
394+
}
395+
396+
if (_connectionSettings.Recovery.IsTopologyActive())
397+
{
398+
Trace.WriteLine(TraceLevel.Information, $"{ToString()} Recovering topology");
399+
var visitor = new Visitor(_management);
400+
await _recordingTopologyListener.Accept(visitor)
401+
.ConfigureAwait(false);
402+
}
413403

414-
Trace.WriteLine(TraceLevel.Verbose, $"{ToString()} is closed");
415-
OnNewStatus(State.Closed, Utils.ConvertError(error));
404+
OnNewStatus(State.Open, null);
405+
// after the connection is recovered we have to reconnect all the publishers and consumers
406+
407+
try
408+
{
409+
await ReconnectEntitiesAsync().ConfigureAwait(false);
410+
}
411+
catch (Exception e)
412+
{
413+
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
414+
}
415+
}
416+
}
417+
catch
418+
{
419+
// TODO set states to Closed? Error?
420+
// This will be skipped if reconnection succeeds, but if there
421+
// is an exception, it's important that this be called.
422+
ConnectionCloseTaskCompletionSource.SetResult(true);
423+
throw;
416424
}
417425
finally
418426
{
419427
_semaphoreClose.Release();
420428
}
421-
422-
ConnectionCloseTaskCompletionSource.SetResult(true);
423429
};
424430
}
425431

0 commit comments

Comments
 (0)