Skip to content

AsyncDisposable #1684

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
https://learn.microsoft.com/en-us/answers/questions/1371494/for-net-standard-2-0-library-why-add-net-core-3-1
https://devblogs.microsoft.com/dotnet/embracing-nullable-reference-types/#what-should-library-authors-do
-->
<LangVersion>8.0</LangVersion>
<LangVersion>9.0</LangVersion>
<Nullable>enable</Nullable>
</PropertyGroup>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>../../packages</PackageOutputPath>
<PackageReadmeFile>README.md</PackageReadmeFile>
<LangVersion>7.3</LangVersion>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Release' And '$(CI)' == 'true'">
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ namespace RabbitMQ.Client
/// functionality offered by versions 0-8, 0-8qpid, 0-9 and 0-9-1 of AMQP.
/// </summary>
/// <remarks>
/// Extends the <see cref="IDisposable"/> interface, so that the "using"
/// Extends the <see cref="IDisposable"/> interface and the <see cref="IAsyncDisposable"/> interface, so that the "using"
/// statement can be used to scope the lifetime of a channel when appropriate.
/// </remarks>
public interface IChannel : IDisposable
public interface IChannel : IAsyncDisposable, IDisposable
{
/// <summary>
/// Channel number, unique per connections.
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ namespace RabbitMQ.Client
/// Alternatively, an API tutorial can be found in the User Guide.
/// </para>
/// <para>
/// Extends the <see cref="IDisposable"/> interface, so that the "using"
/// Extends the <see cref="IDisposable"/> and the <see cref="IAsyncDisposable"/> interface, so that the "using"
/// statement can be used to scope the lifetime of a connection when
/// appropriate.
/// </para>
/// </remarks>
public interface IConnection : INetworkConnection, IDisposable
public interface IConnection : INetworkConnection, IAsyncDisposable, IDisposable
{
/// <summary>
/// The maximum channel number this connection supports (0 if unlimited).
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ await _connection.DeleteRecordedChannelAsync(this,
public override string ToString()
=> InnerChannel.ToString();

public void Dispose()
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
{
Expand All @@ -266,7 +268,8 @@ public void Dispose()

if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
await this.AbortAsync()
.ConfigureAwait(false);
}

_recordedConsumerTags.Clear();
Expand Down
21 changes: 12 additions & 9 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.Recovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,12 @@ private async ValueTask RecoverExchangesAsync(IConnection connection,
{
try
{
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await using (channel.ConfigureAwait(false))
{
await recordedExchange.RecoverAsync(ch, cancellationToken)
await recordedExchange.RecoverAsync(channel, cancellationToken)
.ConfigureAwait(false);
await ch.CloseAsync(cancellationToken)
await channel.CloseAsync(cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -351,11 +352,12 @@ private async Task RecoverQueuesAsync(IConnection connection,
try
{
string newName = string.Empty;
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await using (channel.ConfigureAwait(false))
{
newName = await recordedQueue.RecoverAsync(ch, cancellationToken)
newName = await recordedQueue.RecoverAsync(channel, cancellationToken)
.ConfigureAwait(false);
await ch.CloseAsync(cancellationToken)
await channel.CloseAsync(cancellationToken)
.ConfigureAwait(false);
}
string oldName = recordedQueue.Name;
Expand Down Expand Up @@ -463,11 +465,12 @@ private async ValueTask RecoverBindingsAsync(IConnection connection,
{
try
{
using (IChannel ch = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false))
IChannel channel = await connection.CreateChannelAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
await using (channel.ConfigureAwait(false))
{
await binding.RecoverAsync(ch, cancellationToken)
await binding.RecoverAsync(channel, cancellationToken)
.ConfigureAwait(false);
await ch.CloseAsync(cancellationToken)
await channel.CloseAsync(cancellationToken)
.ConfigureAwait(false);
}
}
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ await RecordChannelAsync(channel, channelsSemaphoreHeld: false, cancellationToke
return channel;
}

public void Dispose()
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
{
Expand All @@ -273,7 +275,8 @@ public void Dispose()

try
{
_innerConnection.Dispose();
await _innerConnection.DisposeAsync()
.ConfigureAwait(false);
}
catch (OperationInterruptedException)
{
Expand Down
20 changes: 20 additions & 0 deletions projects/RabbitMQ.Client/Impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,26 @@ protected virtual void Dispose(bool disposing)
}
}

public async ValueTask DisposeAsync()
{
await DisposeAsyncCore()
.ConfigureAwait(false);

Dispose(false);
}

protected virtual async ValueTask DisposeAsyncCore()
{
if (IsOpen)
{
await this.AbortAsync().ConfigureAwait(false);
}

ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore?.Dispose();
}

public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
{
var method = new ConnectionTuneOk(channelMax, frameMax, heartbeat);
Expand Down
7 changes: 5 additions & 2 deletions projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ internal ValueTask WriteAsync(RentedMemory frames, CancellationToken cancellatio
return _frameHandler.WriteAsync(frames, cancellationToken);
}

public void Dispose()
public void Dispose() => DisposeAsync().AsTask().GetAwaiter().GetResult();

public async ValueTask DisposeAsync()
{
if (_disposed)
{
Expand All @@ -496,7 +498,8 @@ public void Dispose()
{
if (IsOpen)
{
this.AbortAsync().GetAwaiter().GetResult();
await this.AbortAsync()
.ConfigureAwait(false);
}

_session0.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

<PropertyGroup>
<OutputType>Exe</OutputType>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
3 changes: 1 addition & 2 deletions projects/Test/Applications/CreateChannel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public static async Task Main()
doneEvent = new AutoResetEvent(false);

var connectionFactory = new ConnectionFactory { };
IConnection connection = await connectionFactory.CreateConnectionAsync();
await using IConnection connection = await connectionFactory.CreateConnectionAsync();

var watch = Stopwatch.StartNew();
_ = Task.Run(async () =>
Expand Down Expand Up @@ -55,7 +55,6 @@ public static async Task Main()
Console.WriteLine();
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");

connection.Dispose();
Console.ReadLine();
}
}
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Applications/GH-1647/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@

var props = new BasicProperties();
byte[] msg = Encoding.UTF8.GetBytes("test");
using var connection = await connectionFactory.CreateConnectionAsync();
await using var connection = await connectionFactory.CreateConnectionAsync();
for (int i = 0; i < 300; i++)
{
try
{
using var channel = await connection.CreateChannelAsync(); // New channel for each message
await using var channel = await connection.CreateChannelAsync(); // New channel for each message
await Task.Delay(1000);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: string.Empty,
mandatory: false, basicProperties: props, body: msg);
Expand Down
4 changes: 2 additions & 2 deletions projects/Test/Applications/MassPublish/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ static Program()

static async Task Main()
{
using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync();
await using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync();
consumeConnection.ConnectionShutdownAsync += ConnectionShutdownAsync;

using IChannel consumeChannel = await consumeConnection.CreateChannelAsync();
await using IChannel consumeChannel = await consumeConnection.CreateChannelAsync();
consumeChannel.ChannelShutdownAsync += Channel_ChannelShutdownAsync;
await consumeChannel.BasicQosAsync(prefetchSize: 0, prefetchCount: 128, global: false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ static async Task PublishMessagesIndividuallyAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");

using IConnection connection = await CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();
await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand All @@ -51,8 +51,8 @@ static async Task PublishMessagesInBatchAsync()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms in batches");

using IConnection connection = await CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();
await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down Expand Up @@ -97,8 +97,8 @@ async Task HandlePublishConfirmsAsynchronously()
{
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms asynchronously");

using IConnection connection = await CreateConnectionAsync();
using IChannel channel = await connection.CreateChannelAsync();
await using IConnection connection = await CreateConnectionAsync();
await using IChannel channel = await connection.CreateChannelAsync();

// declare a server-named queue
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Common/Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<AssemblyOriginatorKeyFile>../../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<IsTestProject>false</IsTestProject>
<LangVersion>7.3</LangVersion>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ public override async Task DisposeAsync()
{
ConnectionFactory cf = CreateConnectionFactory();
cf.ClientProvidedName += "-TearDown";
using (IConnection conn = await cf.CreateConnectionAsync())
await using (IConnection conn = await cf.CreateConnectionAsync())
{
using (IChannel ch = await conn.CreateChannelAsync())
await using (IChannel ch = await conn.CreateChannelAsync())
{
await ch.QueueDeleteAsync(_queueName);
await ch.CloseAsync();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,
consumer.ReceivedAsync += MessageReceived;
await _channel.BasicConsumeAsync(queueName, true, consumer);

using (IChannel pubCh = await _conn.CreateChannelAsync())
await using (IChannel pubCh = await _conn.CreateChannelAsync())
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
await pubCh.CloseAsync();
Expand All @@ -106,7 +106,7 @@ await _channel.ExchangeDeclareAsync(exchange: exchangeName,

await CloseAndWaitForRecoveryAsync();

using (IChannel pubCh = await _conn.CreateChannelAsync())
await using (IChannel pubCh = await _conn.CreateChannelAsync())
{
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
await pubCh.CloseAsync();
Expand Down
2 changes: 1 addition & 1 deletion projects/Test/Integration/Integration.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<AssemblyOriginatorKeyFile>../../rabbit.snk</AssemblyOriginatorKeyFile>
<SignAssembly>true</SignAssembly>
<IsTestProject>true</IsTestProject>
<LangVersion>8.0</LangVersion>
<LangVersion>9.0</LangVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Loading