Skip to content

Address ObjectDisposedException #1809

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 30 additions & 6 deletions projects/Applications/CreateChannel/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@

using System;
using System.Diagnostics;
using System.Threading;
using System.Globalization;
using System.Runtime.ExceptionServices;
using System.Threading.Tasks;

using RabbitMQ.Client;
using RabbitMQ.Client.Exceptions;

namespace CreateChannel
{
Expand All @@ -44,11 +46,11 @@ public static class Program
private const int ChannelsToOpen = 50;

private static int channelsOpened;
private static AutoResetEvent doneEvent;
private readonly static TaskCompletionSource<bool> s_tcs = new();

public static async Task Main()
{
doneEvent = new AutoResetEvent(false);
AppDomain.CurrentDomain.FirstChanceException += CurrentDomain_FirstChanceException;

var connectionFactory = new ConnectionFactory { };
await using IConnection connection = await connectionFactory.CreateConnectionAsync();
Expand All @@ -67,26 +69,48 @@ public static async Task Main()

for (int j = 0; j < channels.Length; j++)
{
if (j % 2 == 0)
{
try
{
await channels[j].QueueDeclarePassiveAsync(Guid.NewGuid().ToString());
}
catch (Exception)
{
}
}
await channels[j].DisposeAsync();
}
}

doneEvent.Set();
s_tcs.SetResult(true);
});

Console.WriteLine($"{Repeats} times opening {ChannelsToOpen} channels on a connection. => Total channel open/close: {Repeats * ChannelsToOpen}");
Console.WriteLine();
Console.WriteLine("Opened");
while (!doneEvent.WaitOne(500))
while (false == s_tcs.Task.IsCompleted)
{
await Task.Delay(500);
Console.WriteLine($"{channelsOpened,5}");
}
watch.Stop();
Console.WriteLine($"{channelsOpened,5}");
Console.WriteLine();
Console.WriteLine($"Took {watch.Elapsed.TotalMilliseconds} ms");
}

private static string Now => DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture);

Console.ReadLine();
private static void CurrentDomain_FirstChanceException(object sender, FirstChanceExceptionEventArgs e)
{
if (e.Exception is OperationInterruptedException)
{
}
else
{
Console.Error.WriteLine("{0} [ERROR] {1}", Now, e.Exception);
}
}
}
}
23 changes: 23 additions & 0 deletions projects/RabbitMQ.Client/Impl/AsyncRpcContinuations.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public ConfiguredTaskAwaitable<T>.ConfiguredTaskAwaiter GetAwaiter()
return _tcsConfiguredTaskAwaitable.GetAwaiter();
}

public abstract ProtocolCommandId[] HandledProtocolCommandIds { get; }

public async Task HandleCommandAsync(IncomingCommand cmd)
{
try
Expand Down Expand Up @@ -203,6 +205,9 @@ public ConnectionSecureOrTuneAsyncRpcContinuation(TimeSpan continuationTimeout,
{
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [ProtocolCommandId.ConnectionSecure, ProtocolCommandId.ConnectionTune];

protected override Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == ProtocolCommandId.ConnectionSecure)
Expand Down Expand Up @@ -240,6 +245,9 @@ public SimpleAsyncRpcContinuation(ProtocolCommandId expectedCommandId, TimeSpan
_expectedCommandId = expectedCommandId;
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [_expectedCommandId];

protected override Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == _expectedCommandId)
Expand Down Expand Up @@ -297,6 +305,9 @@ public BasicConsumeAsyncRpcContinuation(IAsyncBasicConsumer consumer, IConsumerD
_consumerDispatcher = consumerDispatcher;
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [ProtocolCommandId.BasicConsumeOk];

protected override async Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == ProtocolCommandId.BasicConsumeOk)
Expand Down Expand Up @@ -326,6 +337,9 @@ public BasicGetAsyncRpcContinuation(Func<ulong, ulong> adjustDeliveryTag,
_adjustDeliveryTag = adjustDeliveryTag;
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [ProtocolCommandId.BasicGetOk, ProtocolCommandId.BasicGetEmpty];

internal DateTime StartTime { get; } = DateTime.UtcNow;

protected override Task DoHandleCommandAsync(IncomingCommand cmd)
Expand Down Expand Up @@ -441,6 +455,9 @@ public QueueDeclareAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellati
{
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [ProtocolCommandId.QueueDeclareOk];

protected override Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == ProtocolCommandId.QueueDeclareOk)
Expand Down Expand Up @@ -481,6 +498,9 @@ public QueueDeleteAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellatio
{
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [ProtocolCommandId.QueueDeleteOk];

protected override Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == ProtocolCommandId.QueueDeleteOk)
Expand All @@ -504,6 +524,9 @@ public QueuePurgeAsyncRpcContinuation(TimeSpan continuationTimeout, Cancellation
{
}

public override ProtocolCommandId[] HandledProtocolCommandIds
=> [ProtocolCommandId.QueuePurgeOk];

protected override Task DoHandleCommandAsync(IncomingCommand cmd)
{
if (cmd.CommandId == ProtocolCommandId.QueuePurgeOk)
Expand Down
Loading