Skip to content

Commit 7701fbd

Browse files
danielmarbachlukebakken
authored andcommitted
Make channel implement IAsyncDisposable
1 parent 13fa6b2 commit 7701fbd

File tree

3 files changed

+40
-3
lines changed

3 files changed

+40
-3
lines changed

projects/RabbitMQ.Client/IChannel.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ namespace RabbitMQ.Client
4242
/// functionality offered by versions 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP.
4343
/// </summary>
4444
/// <remarks>
45-
/// Extends the <see cref="IDisposable"/> interface, so that the "using"
45+
/// Extends the <see cref="IDisposable"/> interface and the <see cref="IAsyncDisposable"/> interface, so that the "using"
4646
/// statement can be used to scope the lifetime of a channel when appropriate.
4747
/// </remarks>
48-
public interface IChannel : IDisposable
48+
public interface IChannel : IAsyncDisposable, IDisposable
4949
{
5050
/// <summary>
5151
/// Channel number, unique per connections.

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,23 @@ public void Dispose()
273273
_disposed = true;
274274
}
275275

276+
public async ValueTask DisposeAsync()
277+
{
278+
if (_disposed)
279+
{
280+
return;
281+
}
282+
283+
if (IsOpen)
284+
{
285+
await this.AbortAsync()
286+
.ConfigureAwait(false);
287+
}
288+
289+
_recordedConsumerTags.Clear();
290+
_disposed = true;
291+
}
292+
276293
public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);
277294

278295
public ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken)

projects/RabbitMQ.Client/Impl/ChannelBase.cs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -563,11 +563,31 @@ protected virtual void Dispose(bool disposing)
563563
}
564564

565565
ConsumerDispatcher.Dispose();
566-
_rpcSemaphore.Dispose();
566+
_rpcSemaphore?.Dispose();
567567
_confirmSemaphore?.Dispose();
568568
}
569569
}
570570

571+
public async ValueTask DisposeAsync()
572+
{
573+
await DisposeAsyncCore()
574+
.ConfigureAwait(false);
575+
576+
Dispose(false);
577+
}
578+
579+
protected virtual async ValueTask DisposeAsyncCore()
580+
{
581+
if (IsOpen)
582+
{
583+
await this.AbortAsync().ConfigureAwait(false);
584+
}
585+
586+
ConsumerDispatcher.Dispose();
587+
_rpcSemaphore.Dispose();
588+
_confirmSemaphore?.Dispose();
589+
}
590+
571591
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
572592
{
573593
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);

0 commit comments

Comments
 (0)