Skip to content

Commit c4200ab

Browse files
committed
Ensure broker-originated channel closure completes
Fixes #1749 * Ensure `Dispose` and `DisposeAsync` are idempotent and thread-safe. * Use TaskCompletionSource when `HandleChannelCloseAsync` runs to allow dispose methods to wait.
1 parent 5b1c9cc commit c4200ab

File tree

6 files changed

+255
-41
lines changed

6 files changed

+255
-41
lines changed

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
4848
private AutorecoveringConnection _connection;
4949
private RecoveryAwareChannel _innerChannel;
5050
private bool _disposed;
51+
private bool _isDisposing;
52+
private readonly object _locker = new();
5153

5254
private ushort _prefetchCountConsumer;
5355
private ushort _prefetchCountGlobal;
@@ -252,7 +254,15 @@ await _connection.DeleteRecordedChannelAsync(this,
252254
public override string ToString()
253255
=> InnerChannel.ToString();
254256

255-
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
257+
public void Dispose()
258+
{
259+
if (_disposed)
260+
{
261+
return;
262+
}
263+
264+
DisposeAsync().AsTask().GetAwaiter().GetResult();
265+
}
256266

257267
public async ValueTask DisposeAsync()
258268
{
@@ -261,14 +271,30 @@ public async ValueTask DisposeAsync()
261271
return;
262272
}
263273

264-
if (IsOpen)
274+
lock (_locker)
265275
{
266-
await this.AbortAsync()
267-
.ConfigureAwait(false);
276+
if (_isDisposing)
277+
{
278+
return;
279+
}
280+
_isDisposing = true;
268281
}
269282

270-
_recordedConsumerTags.Clear();
271-
_disposed = true;
283+
try
284+
{
285+
if (IsOpen)
286+
{
287+
await this.AbortAsync()
288+
.ConfigureAwait(false);
289+
}
290+
291+
_recordedConsumerTags.Clear();
292+
}
293+
finally
294+
{
295+
_disposed = true;
296+
_isDisposing = false;
297+
}
272298
}
273299

274300
public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ internal sealed partial class AutorecoveringConnection : IConnection
5050

5151
private Connection _innerConnection;
5252
private bool _disposed;
53+
private bool _isDisposing;
54+
private readonly object _locker = new();
5355

5456
private Connection InnerConnection
5557
{
@@ -268,7 +270,15 @@ await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, ca
268270
return autorecoveringChannel;
269271
}
270272

271-
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
273+
public void Dispose()
274+
{
275+
if (_disposed)
276+
{
277+
return;
278+
}
279+
280+
DisposeAsync().AsTask().GetAwaiter().GetResult();
281+
}
272282

273283
public async ValueTask DisposeAsync()
274284
{
@@ -277,22 +287,33 @@ public async ValueTask DisposeAsync()
277287
return;
278288
}
279289

290+
lock (_locker)
291+
{
292+
if (_isDisposing)
293+
{
294+
return;
295+
}
296+
_isDisposing = true;
297+
}
298+
280299
try
281300
{
282301
await _innerConnection.DisposeAsync()
283302
.ConfigureAwait(false);
303+
304+
_channels.Clear();
305+
_recordedEntitiesSemaphore.Dispose();
306+
_channelsSemaphore.Dispose();
307+
_recoveryCancellationTokenSource.Dispose();
284308
}
285309
catch (OperationInterruptedException)
286310
{
287311
// ignored, see rabbitmq/rabbitmq-dotnet-client#133
288312
}
289313
finally
290314
{
291-
_channels.Clear();
292-
_recordedEntitiesSemaphore.Dispose();
293-
_channelsSemaphore.Dispose();
294-
_recoveryCancellationTokenSource.Dispose();
295315
_disposed = true;
316+
_isDisposing = false;
296317
}
297318
}
298319

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 111 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,15 @@ internal partial class Channel : IChannel, IRecoverable
5959
private ShutdownEventArgs? _closeReason;
6060
public ShutdownEventArgs? CloseReason => Volatile.Read(ref _closeReason);
6161

62+
private TaskCompletionSource<bool>? _serverOriginatedChannelCloseTcs;
63+
6264
internal readonly IConsumerDispatcher ConsumerDispatcher;
6365

66+
private bool _disposed;
67+
private bool _isDisposing;
68+
69+
private readonly object _locker = new();
70+
6471
public Channel(ISession session, CreateChannelOptions createChannelOptions)
6572
{
6673
ContinuationTimeout = createChannelOptions.ContinuationTimeout;
@@ -514,22 +521,54 @@ public override string ToString()
514521

515522
void IDisposable.Dispose()
516523
{
524+
if (_disposed)
525+
{
526+
return;
527+
}
528+
517529
Dispose(true);
518530
}
519531

520532
protected virtual void Dispose(bool disposing)
521533
{
522-
if (disposing)
534+
if (_disposed)
523535
{
524-
if (IsOpen)
536+
return;
537+
}
538+
539+
lock (_locker)
540+
{
541+
if (_isDisposing)
525542
{
526-
this.AbortAsync().GetAwaiter().GetResult();
543+
return;
527544
}
545+
_isDisposing = true;
546+
}
528547

529-
ConsumerDispatcher.Dispose();
530-
_rpcSemaphore.Dispose();
531-
_confirmSemaphore.Dispose();
532-
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
548+
if (disposing)
549+
{
550+
try
551+
{
552+
if (IsOpen)
553+
{
554+
this.AbortAsync().GetAwaiter().GetResult();
555+
}
556+
557+
if (_serverOriginatedChannelCloseTcs is not null)
558+
{
559+
_serverOriginatedChannelCloseTcs.Task.Wait(TimeSpan.FromSeconds(5));
560+
}
561+
562+
ConsumerDispatcher.Dispose();
563+
_rpcSemaphore.Dispose();
564+
_confirmSemaphore.Dispose();
565+
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
566+
}
567+
finally
568+
{
569+
_disposed = true;
570+
_isDisposing = false;
571+
}
533572
}
534573
}
535574

@@ -543,18 +582,46 @@ await DisposeAsyncCore()
543582

544583
protected virtual async ValueTask DisposeAsyncCore()
545584
{
546-
if (IsOpen)
585+
if (_disposed)
547586
{
548-
await this.AbortAsync().ConfigureAwait(false);
587+
return;
549588
}
550589

551-
ConsumerDispatcher.Dispose();
552-
_rpcSemaphore.Dispose();
553-
_confirmSemaphore.Dispose();
554-
if (_outstandingPublisherConfirmationsRateLimiter is not null)
590+
lock (_locker)
555591
{
556-
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
557-
.ConfigureAwait(false);
592+
if (_isDisposing)
593+
{
594+
return;
595+
}
596+
_isDisposing = true;
597+
}
598+
599+
try
600+
{
601+
if (IsOpen)
602+
{
603+
await this.AbortAsync().ConfigureAwait(false);
604+
}
605+
606+
if (_serverOriginatedChannelCloseTcs is not null)
607+
{
608+
await _serverOriginatedChannelCloseTcs.Task.WaitAsync(TimeSpan.FromSeconds(5))
609+
.ConfigureAwait(false);
610+
}
611+
612+
ConsumerDispatcher.Dispose();
613+
_rpcSemaphore.Dispose();
614+
_confirmSemaphore.Dispose();
615+
if (_outstandingPublisherConfirmationsRateLimiter is not null)
616+
{
617+
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
618+
.ConfigureAwait(false);
619+
}
620+
}
621+
finally
622+
{
623+
_disposed = true;
624+
_isDisposing = false;
558625
}
559626
}
560627

@@ -651,23 +718,38 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
651718

652719
protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, CancellationToken cancellationToken)
653720
{
654-
var channelClose = new ChannelClose(cmd.MethodSpan);
655-
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
656-
channelClose._replyCode,
657-
channelClose._replyText,
658-
channelClose._classId,
659-
channelClose._methodId));
721+
lock (_locker)
722+
{
723+
_serverOriginatedChannelCloseTcs ??= new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
724+
}
660725

661-
await Session.CloseAsync(_closeReason, notify: false)
662-
.ConfigureAwait(false);
726+
try
727+
{
728+
var channelClose = new ChannelClose(cmd.MethodSpan);
729+
SetCloseReason(new ShutdownEventArgs(ShutdownInitiator.Peer,
730+
channelClose._replyCode,
731+
channelClose._replyText,
732+
channelClose._classId,
733+
channelClose._methodId));
663734

664-
var method = new ChannelCloseOk();
665-
await ModelSendAsync(in method, cancellationToken)
666-
.ConfigureAwait(false);
735+
await Session.CloseAsync(_closeReason, notify: false)
736+
.ConfigureAwait(false);
667737

668-
await Session.NotifyAsync(cancellationToken)
669-
.ConfigureAwait(false);
670-
return true;
738+
var method = new ChannelCloseOk();
739+
await ModelSendAsync(in method, cancellationToken)
740+
.ConfigureAwait(false);
741+
742+
await Session.NotifyAsync(cancellationToken)
743+
.ConfigureAwait(false);
744+
745+
_serverOriginatedChannelCloseTcs.TrySetResult(true);
746+
return true;
747+
}
748+
catch (Exception ex)
749+
{
750+
_serverOriginatedChannelCloseTcs.TrySetException(ex);
751+
throw;
752+
}
671753
}
672754

673755
protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, CancellationToken cancellationToken)

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ namespace RabbitMQ.Client.Framing
4646
internal sealed partial class Connection : IConnection
4747
{
4848
private bool _disposed;
49+
private bool _isDisposing;
50+
private readonly object _locker = new();
4951
private volatile bool _closed;
5052

5153
private readonly ConnectionConfig _config;
@@ -485,7 +487,15 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
485487
return _frameHandler.WriteAsync(frames, cancellationToken);
486488
}
487489

488-
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();
490+
public void Dispose()
491+
{
492+
if (_disposed)
493+
{
494+
return;
495+
}
496+
497+
DisposeAsync().AsTask().GetAwaiter().GetResult();
498+
}
489499

490500
public async ValueTask DisposeAsync()
491501
{
@@ -494,6 +504,15 @@ public async ValueTask DisposeAsync()
494504
return;
495505
}
496506

507+
lock (_locker)
508+
{
509+
if (_isDisposing)
510+
{
511+
return;
512+
}
513+
_isDisposing = true;
514+
}
515+
497516
try
498517
{
499518
if (IsOpen)
@@ -515,6 +534,7 @@ await _channel0.DisposeAsync()
515534
finally
516535
{
517536
_disposed = true;
537+
_isDisposing = false;
518538
}
519539
}
520540

0 commit comments

Comments
 (0)