Skip to content

Commit 77dd240

Browse files
authored
Merge pull request #1588 from rabbitmq/rabbitmq-dotnet-client-1573
Change test to match code provided by @neilgreatorex
2 parents 11fbf3b + a6e188a commit 77dd240

File tree

2 files changed

+36
-34
lines changed

2 files changed

+36
-34
lines changed

projects/Test/SequentialIntegration/SequentialIntegrationFixture.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,15 @@ public SequentialIntegrationFixture(ITestOutputHelper output) : base(output)
4242
{
4343
}
4444

45+
public async Task BlockAsync()
46+
{
47+
await _rabbitMQCtl.ExecRabbitMQCtlAsync("set_vm_memory_high_watermark absolute 10");
48+
await Task.Delay(TimeSpan.FromSeconds(1));
49+
}
50+
4551
public async Task BlockAsync(IChannel channel)
4652
{
47-
await _rabbitMQCtl.ExecRabbitMQCtlAsync("set_vm_memory_high_watermark 0.000000001");
48-
// give rabbitmqctl some time to do its job
49-
await Task.Delay(TimeSpan.FromSeconds(5));
53+
await BlockAsync();
5054
await channel.BasicPublishAsync(exchange: "amq.direct",
5155
routingKey: Guid.NewGuid().ToString(), _encoding.GetBytes("message"));
5256
}

projects/Test/SequentialIntegration/TestConnectionBlockedChannelLeak.cs

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Collections.Generic;
3433
using System.Threading;
3534
using System.Threading.Tasks;
3635
using RabbitMQ.Client;
@@ -49,14 +48,6 @@ public TestConnectionBlockedChannelLeak(ITestOutputHelper output) : base(output)
4948
public override async Task InitializeAsync()
5049
{
5150
await UnblockAsync();
52-
_connFactory = new ConnectionFactory
53-
{
54-
AutomaticRecoveryEnabled = true,
55-
ClientProvidedName = _testDisplayName,
56-
ContinuationTimeout = TimeSpan.FromSeconds(2)
57-
};
58-
_conn = await _connFactory.CreateConnectionAsync();
59-
_channel = await _conn.CreateChannelAsync();
6051
}
6152

6253
public override async Task DisposeAsync()
@@ -68,49 +59,55 @@ public override async Task DisposeAsync()
6859
[Fact]
6960
public async Task TestConnectionBlockedChannelLeak_GH1573()
7061
{
71-
string exchangeName = GenerateExchangeName();
62+
await BlockAsync();
7263

73-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
64+
var connectionBlockedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
65+
var connectionUnblockedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
7466

7567
using var cts = new CancellationTokenSource(WaitSpan);
7668
using CancellationTokenRegistration ctr = cts.Token.Register(() =>
7769
{
78-
tcs.TrySetCanceled();
70+
connectionBlockedTcs.TrySetCanceled();
71+
connectionUnblockedTcs.TrySetCanceled();
7972
});
8073

74+
_connFactory = new ConnectionFactory
75+
{
76+
AutomaticRecoveryEnabled = true,
77+
ClientProvidedName = _testDisplayName,
78+
ContinuationTimeout = TimeSpan.FromSeconds(2)
79+
};
80+
_conn = await _connFactory.CreateConnectionAsync();
81+
_channel = await _conn.CreateChannelAsync();
82+
83+
string exchangeName = GenerateExchangeName();
84+
8185
_conn.ConnectionBlocked += (object sender, ConnectionBlockedEventArgs args) =>
8286
{
83-
UnblockAsync();
87+
connectionBlockedTcs.SetResult(true);
8488
};
8589

8690
_conn.ConnectionUnblocked += (object sender, EventArgs ea) =>
8791
{
88-
tcs.SetResult(true);
92+
connectionUnblockedTcs.SetResult(true);
8993
};
9094

91-
await BlockAsync(_channel);
92-
93-
using (IChannel publishChannel = await _conn.CreateChannelAsync())
95+
async Task ExchangeDeclareAndPublish()
9496
{
95-
await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
96-
await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true);
97-
await publishChannel.CloseAsync();
97+
using (IChannel publishChannel = await _conn.CreateChannelAsync())
98+
{
99+
await publishChannel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
100+
await publishChannel.BasicPublishAsync(exchangeName, exchangeName, GetRandomBody(), mandatory: true);
101+
await publishChannel.CloseAsync();
102+
}
98103
}
104+
await Assert.ThrowsAnyAsync<OperationCanceledException>(ExchangeDeclareAndPublish);
99105

100-
var channels = new List<IChannel>();
101106
for (int i = 1; i <= 5; i++)
102107
{
103-
IChannel c = await _conn.CreateChannelAsync();
104-
channels.Add(c);
108+
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => _conn.CreateChannelAsync());
105109
}
106110

107-
/*
108-
* Note:
109-
* This wait probably isn't necessary, if the above CreateChannelAsync
110-
* calls were to timeout, we'd get exceptions on the await
111-
*/
112-
await Task.Delay(TimeSpan.FromSeconds(5));
113-
114111
// Note: debugging
115112
// var rmq = new RabbitMQCtl(_output);
116113
// string output = await rmq.ExecRabbitMQCtlAsync("list_channels");
@@ -121,7 +118,8 @@ public async Task TestConnectionBlockedChannelLeak_GH1573()
121118
// output = await rmq.ExecRabbitMQCtlAsync("list_channels");
122119
// _output.WriteLine("CHANNELS 1: {0}", output);
123120

124-
Assert.True(await tcs.Task, "Unblock notification not received.");
121+
Assert.True(await connectionBlockedTcs.Task, "Blocked notification not received.");
122+
Assert.True(await connectionUnblockedTcs.Task, "Unblocked notification not received.");
125123
}
126124
}
127125
}

0 commit comments

Comments
 (0)