Skip to content

Commit ee66465

Browse files
authored
Merge pull request #1665 from rabbitmq/lukebakken/concurrent-channel-test
Add test to demonstrate `IChannel` thread-safety
2 parents 5d49028 + a948116 commit ee66465

File tree

5 files changed

+298
-206
lines changed

5 files changed

+298
-206
lines changed

projects/Test/Common/IntegrationFixture.cs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -485,9 +485,11 @@ protected static void AssertPreconditionFailed(ShutdownEventArgs args)
485485
AssertShutdownError(args, Constants.PreconditionFailed);
486486
}
487487

488-
protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
488+
protected async Task AssertRanToCompletion(IEnumerable<Task> tasks)
489489
{
490-
return DoAssertRanToCompletion(tasks);
490+
Task whenAllTask = Task.WhenAll(tasks);
491+
await whenAllTask.WaitAsync(LongWaitSpan);
492+
Assert.True(whenAllTask.IsCompletedSuccessfully());
491493
}
492494

493495
internal static void AssertRecordedQueues(AutorecoveringConnection c, int n)
@@ -598,13 +600,6 @@ private static int GetConnectionIdx()
598600
return Interlocked.Increment(ref _connectionIdx);
599601
}
600602

601-
private static async Task DoAssertRanToCompletion(IEnumerable<Task> tasks)
602-
{
603-
Task whenAllTask = Task.WhenAll(tasks);
604-
await whenAllTask;
605-
Assert.True(whenAllTask.IsCompletedSuccessfully());
606-
}
607-
608603
protected static string GetUniqueString(ushort length)
609604
{
610605
byte[] bytes = GetRandomBody(length);
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Generic;
34+
using System.Threading.Tasks;
35+
using Xunit;
36+
using Xunit.Abstractions;
37+
38+
namespace Test.Integration
39+
{
40+
public class TestConcurrentAccessBase : IntegrationFixture
41+
{
42+
protected const ushort _messageCount = 200;
43+
44+
public TestConcurrentAccessBase(ITestOutputHelper output,
45+
ushort consumerDispatchConcurrency = 1,
46+
bool openChannel = true) : base(output, consumerDispatchConcurrency, openChannel)
47+
{
48+
}
49+
50+
protected async Task TestConcurrentOperationsAsync(Func<Task> action, int iterations)
51+
{
52+
var tasks = new List<Task>();
53+
for (int i = 0; i < _processorCount; i++)
54+
{
55+
for (int j = 0; j < iterations; j++)
56+
{
57+
await Task.Delay(S_Random.Next(1, 10));
58+
tasks.Add(action());
59+
}
60+
}
61+
await AssertRanToCompletion(tasks);
62+
63+
// incorrect frame interleaving in these tests will result
64+
// in an unrecoverable connection-level exception, thus
65+
// closing the connection
66+
Assert.True(_conn.IsOpen);
67+
}
68+
}
69+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Collections.Concurrent;
34+
using System.Collections.Generic;
35+
using System.Linq;
36+
using System.Threading;
37+
using System.Threading.Tasks;
38+
using RabbitMQ.Client;
39+
using RabbitMQ.Client.Events;
40+
using Xunit;
41+
using Xunit.Abstractions;
42+
43+
namespace Test.Integration
44+
{
45+
public class TestConcurrentAccessWithSharedChannel : TestConcurrentAccessBase
46+
{
47+
private const int Iterations = 10;
48+
49+
public TestConcurrentAccessWithSharedChannel(ITestOutputHelper output)
50+
: base(output)
51+
{
52+
}
53+
54+
[Fact]
55+
public async Task ConcurrentPublishSingleChannel()
56+
{
57+
int publishAckCount = 0;
58+
59+
_channel.BasicAcks += (object sender, BasicAckEventArgs e) =>
60+
{
61+
Interlocked.Increment(ref publishAckCount);
62+
};
63+
64+
_channel.BasicNacks += (object sender, BasicNackEventArgs e) =>
65+
{
66+
_output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
67+
};
68+
69+
await _channel.ConfirmSelectAsync(trackConfirmations: false);
70+
71+
await TestConcurrentOperationsAsync(async () =>
72+
{
73+
long receivedCount = 0;
74+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
75+
76+
var msgTracker = new ConcurrentDictionary<ushort, bool>();
77+
78+
QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty, passive: false,
79+
durable: false, exclusive: true, autoDelete: true, arguments: null);
80+
81+
var consumer = new AsyncEventingBasicConsumer(_channel);
82+
83+
consumer.Received += async (object sender, BasicDeliverEventArgs ea) =>
84+
{
85+
try
86+
{
87+
System.Diagnostics.Debug.Assert(object.ReferenceEquals(sender, consumer));
88+
ushort idx = ushort.Parse(_encoding.GetString(ea.Body.ToArray()));
89+
Assert.False(msgTracker[idx]);
90+
msgTracker[idx] = true;
91+
92+
var cons = (AsyncEventingBasicConsumer)sender;
93+
IChannel ch = cons.Channel;
94+
await ch.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
95+
96+
if (Interlocked.Increment(ref receivedCount) == _messageCount)
97+
{
98+
if (msgTracker.Values.Any(v => v == false))
99+
{
100+
tcs.SetResult(false);
101+
}
102+
else
103+
{
104+
tcs.SetResult(true);
105+
}
106+
}
107+
}
108+
catch (Exception ex)
109+
{
110+
tcs.SetException(ex);
111+
}
112+
};
113+
114+
await _channel.BasicConsumeAsync(queue: q.QueueName, autoAck: false, consumer);
115+
116+
var publishTasks = new List<ValueTask>();
117+
for (ushort i = 0; i < _messageCount; i++)
118+
{
119+
msgTracker[i] = false;
120+
byte[] body = _encoding.GetBytes(i.ToString());
121+
publishTasks.Add(_channel.BasicPublishAsync("", q.QueueName, mandatory: true, body: body));
122+
}
123+
124+
foreach (ValueTask pt in publishTasks)
125+
{
126+
await pt;
127+
}
128+
129+
Assert.True(await tcs.Task);
130+
}, Iterations);
131+
132+
_output.WriteLine("@@@@@@@@ PUBLISH ACK COUNT: {0}", publishAckCount);
133+
}
134+
}
135+
}

projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs

Lines changed: 90 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,67 +30,131 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Collections.Generic;
33+
using System.Threading;
3434
using System.Threading.Tasks;
3535
using RabbitMQ.Client;
36+
using RabbitMQ.Client.Events;
3637
using Xunit;
3738
using Xunit.Abstractions;
3839

3940
namespace Test.Integration
4041
{
41-
public class TestConcurrentAccessWithSharedConnection : IntegrationFixture
42+
public class TestConcurrentAccessWithSharedConnection : TestConcurrentAccessBase
4243
{
4344
public TestConcurrentAccessWithSharedConnection(ITestOutputHelper output)
44-
: base(output)
45+
: base(output, openChannel: false)
4546
{
4647
}
4748

4849
public override async Task InitializeAsync()
4950
{
5051
_connFactory = CreateConnectionFactory();
5152
_conn = await _connFactory.CreateConnectionAsync();
53+
_conn.ConnectionShutdown += HandleConnectionShutdown;
5254
// NB: not creating _channel because this test suite doesn't use it.
5355
Assert.Null(_channel);
5456
}
5557

5658
[Fact]
5759
public async Task TestConcurrentChannelOpenCloseLoop()
5860
{
59-
await TestConcurrentChannelOperationsAsync(async (conn) =>
61+
await TestConcurrentOperationsAsync(async () =>
6062
{
61-
using (IChannel ch = await conn.CreateChannelAsync())
63+
using (IChannel ch = await _conn.CreateChannelAsync())
6264
{
6365
await ch.CloseAsync();
6466
}
6567
}, 50);
6668
}
6769

68-
private async Task TestConcurrentChannelOperationsAsync(Func<IConnection, Task> action, int iterations)
70+
[Fact]
71+
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync()
72+
{
73+
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(Array.Empty<byte>(), 30);
74+
}
75+
76+
[Fact]
77+
public Task TestConcurrentChannelOpenAndPublishingSize64Async()
6978
{
70-
var tasks = new List<Task>();
71-
for (int i = 0; i < _processorCount; i++)
79+
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(64);
80+
}
81+
82+
[Fact]
83+
public Task TestConcurrentChannelOpenAndPublishingSize256Async()
84+
{
85+
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(256);
86+
}
87+
88+
[Fact]
89+
public Task TestConcurrentChannelOpenAndPublishingSize1024Async()
90+
{
91+
return TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(1024);
92+
}
93+
94+
private Task TestConcurrentChannelOpenAndPublishingWithBodyOfSizeAsync(ushort length, int iterations = 30)
95+
{
96+
byte[] body = GetRandomBody(length);
97+
return TestConcurrentChannelOpenAndPublishingWithBodyAsync(body, iterations);
98+
}
99+
100+
private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, int iterations)
101+
{
102+
return TestConcurrentOperationsAsync(async () =>
72103
{
73-
tasks.Add(Task.Run(() =>
104+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
105+
var tokenSource = new CancellationTokenSource(LongWaitSpan);
106+
CancellationTokenRegistration ctsr = tokenSource.Token.Register(() =>
74107
{
75-
var subTasks = new List<Task>();
76-
for (int j = 0; j < iterations; j++)
108+
tcs.TrySetResult(false);
109+
});
110+
111+
try
112+
{
113+
using (IChannel ch = await _conn.CreateChannelAsync())
77114
{
78-
subTasks.Add(action(_conn));
115+
ch.ChannelShutdown += (o, ea) =>
116+
{
117+
HandleChannelShutdown(ch, ea, (args) =>
118+
{
119+
if (args.Initiator != ShutdownInitiator.Application)
120+
{
121+
tcs.TrySetException(args.Exception);
122+
}
123+
});
124+
};
125+
126+
await ch.ConfirmSelectAsync(trackConfirmations: false);
127+
128+
ch.BasicAcks += (object sender, BasicAckEventArgs e) =>
129+
{
130+
if (e.DeliveryTag >= _messageCount)
131+
{
132+
tcs.SetResult(true);
133+
}
134+
};
135+
136+
ch.BasicNacks += (object sender, BasicNackEventArgs e) =>
137+
{
138+
tcs.SetResult(false);
139+
_output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
140+
};
141+
142+
QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null);
143+
for (ushort j = 0; j < _messageCount; j++)
144+
{
145+
await ch.BasicPublishAsync("", q.QueueName, mandatory: true, body: body);
146+
}
147+
148+
Assert.True(await tcs.Task);
149+
await ch.CloseAsync();
79150
}
80-
return Task.WhenAll(subTasks);
81-
}));
82-
}
83-
84-
Task whenTask = Task.WhenAll(tasks);
85-
await whenTask.WaitAsync(LongWaitSpan);
86-
Assert.True(whenTask.IsCompleted);
87-
Assert.False(whenTask.IsCanceled);
88-
Assert.False(whenTask.IsFaulted);
89-
90-
// incorrect frame interleaving in these tests will result
91-
// in an unrecoverable connection-level exception, thus
92-
// closing the connection
93-
Assert.True(_conn.IsOpen);
151+
}
152+
finally
153+
{
154+
tokenSource.Dispose();
155+
ctsr.Dispose();
156+
}
157+
}, iterations);
94158
}
95159
}
96160
}

0 commit comments

Comments
 (0)