Skip to content

Commit 0d964b6

Browse files
committed
* Start poking around at the reconnection code
1 parent a9b675a commit 0d964b6

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,8 @@ void OnOpened(Amqp.IConnection connection, Open open1)
271271
await _management.OpenAsync()
272272
.ConfigureAwait(false);
273273

274-
_nativeConnection.Closed += MaybeRecoverConnection();
274+
ClosedCallback closedCallback = BuildClosedCallback();
275+
_nativeConnection.AddClosedCallback(closedCallback);
275276
}
276277
catch (AmqpException e)
277278
{
@@ -295,7 +296,7 @@ await _management.OpenAsync()
295296
/// and then kick off a task dedicated to recovery
296297
/// </summary>
297298
/// <returns></returns>
298-
private ClosedCallback MaybeRecoverConnection()
299+
private ClosedCallback BuildClosedCallback()
299300
{
300301
return async (sender, error) =>
301302
{
@@ -309,8 +310,7 @@ await _semaphoreClose.WaitAsync()
309310
if (error != null)
310311
{
311312
// we assume here that the connection is closed unexpectedly, since the error is not null
312-
Trace.WriteLine(TraceLevel.Warning, $"{ToString()} is closed unexpectedly. "
313-
);
313+
Trace.WriteLine(TraceLevel.Warning, $"{ToString()} closed unexpectedly.");
314314

315315
// we have to check if the recovery is active.
316316
// The user may want to disable the recovery mechanism
@@ -327,7 +327,7 @@ await _semaphoreClose.WaitAsync()
327327
OnNewStatus(State.Reconnecting, Utils.ConvertError(error));
328328
ChangeEntitiesStatus(State.Reconnecting, Utils.ConvertError(error));
329329

330-
await Task.Run(async () =>
330+
Task reconnectionTask = Task.Run(async () =>
331331
{
332332
bool connected = false;
333333
// as first step we try to recover the connection
@@ -403,7 +403,10 @@ await _recordingTopologyListener.Accept(visitor)
403403
{
404404
Trace.WriteLine(TraceLevel.Error, $"{ToString()} error trying to reconnect entities {e}");
405405
}
406-
}).ConfigureAwait(false);
406+
});
407+
408+
// TODO reconnection timeout?
409+
await reconnectionTask.ConfigureAwait(false);
407410

408411
return;
409412
}

RabbitMQ.AMQP.Client/Impl/AmqpManagement.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ await EnsureReceiverLinkAsync()
128128
}
129129

130130
OnNewStatus(State.Closed, Utils.ConvertError(error));
131+
// Note: TrySetResult *must* be used here
131132
ConnectionCloseTaskCompletionSource.TrySetResult(true);
132133
};
133134

0 commit comments

Comments
 (0)