Skip to content

Change test to match code provided by @neilgreatorex #1588

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,15 @@ public SequentialIntegrationFixture(ITestOutputHelper output) : base(output)
{
}

public async Task BlockAsync()
{
await _rabbitMQCtl.ExecRabbitMQCtlAsync("set_vm_memory_high_watermark absolute 10");
await Task.Delay(TimeSpan.FromSeconds(1));
}

public async Task BlockAsync(IChannel channel)
{
await _rabbitMQCtl.ExecRabbitMQCtlAsync("set_vm_memory_high_watermark 0.000000001");
// give rabbitmqctl some time to do its job
await Task.Delay(TimeSpan.FromSeconds(5));
await BlockAsync();
await channel.BasicPublishAsync(exchange: "amq.direct",
routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//---------------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
Expand All @@ -49,14 +48,6 @@ public TestConnectionBlockedChannelLeak(ITestOutputHelper output) : base(output)
public override async Task InitializeAsync()
{
await UnblockAsync();
_connFactory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
ClientProvidedName = _testDisplayName,
ContinuationTimeout = TimeSpan.FromSeconds(2)
};
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync();
}

public override async Task DisposeAsync()
Expand All @@ -68,49 +59,55 @@ public override async Task DisposeAsync()
[Fact]
public async Task TestConnectionBlockedChannelLeak_GH1573()
{
string exchangeName = GenerateExchangeName();
await BlockAsync();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var connectionBlockedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var connectionUnblockedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

using var cts = new CancellationTokenSource(WaitSpan);
using CancellationTokenRegistration ctr = cts.Token.Register(() =>
{
tcs.TrySetCanceled();
connectionBlockedTcs.TrySetCanceled();
connectionUnblockedTcs.TrySetCanceled();
});

_connFactory = new ConnectionFactory
{
AutomaticRecoveryEnabled = true,
ClientProvidedName = _testDisplayName,
ContinuationTimeout = TimeSpan.FromSeconds(2)
};
_conn = await _connFactory.CreateConnectionAsync();
_channel = await _conn.CreateChannelAsync();

string exchangeName = GenerateExchangeName();

_conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) =>
{
UnblockAsync();
connectionBlockedTcs.SetResult(true);
};

_conn.ConnectionUnblocked += (object sender, EventArgs ea) =>
{
tcs.SetResult(true);
connectionUnblockedTcs.SetResult(true);
};

await BlockAsync(_channel);

using (IChannel publishChannel = await _conn.CreateChannelAsync())
async Task ExchangeDeclareAndPublish()
{
await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true);
await publishChannel.CloseAsync();
using (IChannel publishChannel = await _conn.CreateChannelAsync())
{
await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true);
await publishChannel.CloseAsync();
}
}
await Assert.ThrowsAnyAsync<OperationCanceledException>(ExchangeDeclareAndPublish);

var channels = new List<IChannel>();
for (int i = 1; i <= 5; i++)
{
IChannel c = await _conn.CreateChannelAsync();
channels.Add(c);
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => _conn.CreateChannelAsync());
}

/*
* Note:
* This wait probably isn't necessary, if the above CreateChannelAsync
* calls were to timeout, we'd get exceptions on the await
*/
await Task.Delay(TimeSpan.FromSeconds(5));

// Note: debugging
// var rmq = new RabbitMQCtl(_output);
// string output = await rmq.ExecRabbitMQCtlAsync("list_channels");
Expand All @@ -121,7 +118,8 @@ public async Task TestConnectionBlockedChannelLeak_GH1573()
// output = await rmq.ExecRabbitMQCtlAsync("list_channels");
// _output.WriteLine("CHANNELS 1: {0}", output);

Assert.True(await tcs.Task, "Unblock notification not received.");
Assert.True(await connectionBlockedTcs.Task, "Blocked notification not received.");
Assert.True(await connectionUnblockedTcs.Task, "Unblocked notification not received.");
}
}
}