Skip to content

Commit 9bb4995

Browse files
committed
Use CreateConnectionAsync in AsyncIntegration
1 parent 35b836d commit 9bb4995

15 files changed

+55
-28
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,14 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end
8080

8181
internal IConnection Open()
8282
{
83-
return InnerConnection.Open();
83+
InnerConnection.Open();
84+
return this;
8485
}
8586

86-
internal ValueTask<IConnection> OpenAsync()
87+
internal async ValueTask<IConnection> OpenAsync()
8788
{
88-
return InnerConnection.OpenAsync();
89+
await InnerConnection.OpenAsync();
90+
return this;
8991
}
9092

9193
public event EventHandler<EventArgs> RecoverySucceeded

projects/Test/AsyncIntegration/TestAsyncConsumer.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public async Task InitializeAsync()
5656
_connFactory = CreateConnectionFactory();
5757
_connFactory.DispatchConsumersAsync = true;
5858
_connFactory.ConsumerDispatchConcurrency = 2;
59-
_conn = _connFactory.CreateConnection();
59+
_conn = await _connFactory.CreateConnectionAsync();
60+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
6061
_channel = await _conn.CreateChannelAsync();
6162
}
6263

@@ -254,7 +255,7 @@ public async Task TestBasicRejectAsync()
254255
var cf = CreateConnectionFactory();
255256
cf.DispatchConsumersAsync = true;
256257

257-
using IConnection connection = cf.CreateConnection();
258+
using IConnection connection = await cf.CreateConnectionAsync();
258259
using IChannel channel = await connection.CreateChannelAsync();
259260

260261
connection.ConnectionShutdown += (o, ea) =>
@@ -337,7 +338,7 @@ public async Task TestBasicAckAsync()
337338
var cf = CreateConnectionFactory();
338339
cf.DispatchConsumersAsync = true;
339340

340-
using IConnection connection = cf.CreateConnection();
341+
using IConnection connection = await cf.CreateConnectionAsync();
341342
using IChannel channel = await connection.CreateChannelAsync();
342343

343344
connection.ConnectionShutdown += (o, ea) =>
@@ -405,7 +406,7 @@ public async Task TestBasicNackAsync()
405406
var cf = CreateConnectionFactory();
406407
cf.DispatchConsumersAsync = true;
407408

408-
using IConnection connection = cf.CreateConnection();
409+
using IConnection connection = await cf.CreateConnectionAsync();
409410
using IChannel channel = await connection.CreateChannelAsync();
410411

411412
connection.ConnectionShutdown += (o, ea) =>

projects/Test/AsyncIntegration/TestAsyncConsumerExceptions.cs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public async Task InitializeAsync()
5656
{
5757
_connFactory = CreateConnectionFactory();
5858
_connFactory.DispatchConsumersAsync = true;
59-
_conn = _connFactory.CreateConnection();
59+
_conn = await _connFactory.CreateConnectionAsync();
60+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
6061
_channel = await _conn.CreateChannelAsync();
6162
}
6263

@@ -118,22 +119,23 @@ await TestExceptionHandlingWith(consumer, (ch, q, c, ct) =>
118119
protected async Task TestExceptionHandlingWith(IBasicConsumer consumer,
119120
Func<IChannel, string, IBasicConsumer, string, ValueTask> action)
120121
{
121-
var semaphore = new SemaphoreSlim(0, 1);
122-
bool notified = false;
122+
var waitSpan = TimeSpan.FromSeconds(2);
123+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
124+
var cts = new CancellationTokenSource(waitSpan);
125+
cts.Token.Register(() => tcs.TrySetResult(false));
126+
123127
string q = await _channel.QueueDeclareAsync(string.Empty, false, false, true, false, null);
124128
_channel.CallbackException += (ch, evt) =>
125129
{
126130
if (evt.Exception == TestException)
127131
{
128-
notified = true;
129-
semaphore.Release();
132+
tcs.SetResult(true);
130133
}
131134
};
132135

133136
string tag = await _channel.BasicConsumeAsync(q, true, string.Empty, false, false, null, consumer);
134137
await action(_channel, q, consumer, tag);
135-
Assert.True(await semaphore.WaitAsync(TimeSpan.FromSeconds(2)));
136-
Assert.True(notified);
138+
Assert.True(await tcs.Task);
137139
}
138140

139141
private class ConsumerFailingOnDelivery : AsyncEventingBasicConsumer

projects/Test/AsyncIntegration/TestBasicGetAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ protected override void SetUp()
5050
public async Task InitializeAsync()
5151
{
5252
_connFactory = CreateConnectionFactory();
53-
_conn = _connFactory.CreateConnection();
53+
_conn = await _connFactory.CreateConnectionAsync();
54+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5455
_channel = await _conn.CreateChannelAsync();
5556
}
5657

projects/Test/AsyncIntegration/TestBasicPublishAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ protected override void SetUp()
5151
public async Task InitializeAsync()
5252
{
5353
_connFactory = CreateConnectionFactory();
54-
_conn = _connFactory.CreateConnection();
54+
_conn = await _connFactory.CreateConnectionAsync();
55+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5556
_channel = await _conn.CreateChannelAsync();
5657
}
5758

projects/Test/AsyncIntegration/TestConcurrentAccessWithSharedConnectionAsync.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
namespace Test.AsyncIntegration
4242
{
43-
public class TestConcurrentAccessWithSharedConnectionAsync : IntegrationFixture
43+
public class TestConcurrentAccessWithSharedConnectionAsync : IntegrationFixture, IAsyncLifetime
4444
{
4545
private const ushort _messageCount = 200;
4646

@@ -49,14 +49,24 @@ public TestConcurrentAccessWithSharedConnectionAsync(ITestOutputHelper output) :
4949
}
5050

5151
protected override void SetUp()
52+
{
53+
}
54+
55+
public async Task InitializeAsync()
5256
{
5357
_connFactory = CreateConnectionFactory();
54-
_conn = _connFactory.CreateConnection();
58+
_conn = await _connFactory.CreateConnectionAsync();
59+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5560
_conn.ConnectionShutdown += HandleConnectionShutdown;
5661
// NB: not creating _channel because this test suite doesn't use it.
5762
Assert.Null(_channel);
5863
}
5964

65+
public Task DisposeAsync()
66+
{
67+
return Task.CompletedTask;
68+
}
69+
6070
[Fact]
6171
public Task TestConcurrentChannelOpenAndPublishingWithBlankMessagesAsync()
6272
{

projects/Test/AsyncIntegration/TestConfirmSelectAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ protected override void SetUp()
5050
public async Task InitializeAsync()
5151
{
5252
_connFactory = CreateConnectionFactory();
53-
_conn = _connFactory.CreateConnection();
53+
_conn = await _connFactory.CreateConnectionAsync();
54+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5455
_channel = await _conn.CreateChannelAsync();
5556
}
5657

projects/Test/AsyncIntegration/TestExchangeDeclareAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ protected override void SetUp()
5555
public async Task InitializeAsync()
5656
{
5757
_connFactory = CreateConnectionFactory();
58-
_conn = _connFactory.CreateConnection();
58+
_conn = await _connFactory.CreateConnectionAsync();
59+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5960
_channel = await _conn.CreateChannelAsync();
6061
}
6162

projects/Test/AsyncIntegration/TestExtensionsAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ protected override void SetUp()
5151
public async Task InitializeAsync()
5252
{
5353
_connFactory = CreateConnectionFactory();
54-
_conn = _connFactory.CreateConnection();
54+
_conn = await _connFactory.CreateConnectionAsync();
55+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5556
_channel = await _conn.CreateChannelAsync();
5657
}
5758

projects/Test/AsyncIntegration/TestFloodPublishingAsync.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public async Task TestUnthrottledFloodPublishingAsync()
6060
_connFactory = CreateConnectionFactory();
6161
_connFactory.RequestedHeartbeat = TimeSpan.FromSeconds(60);
6262
_connFactory.AutomaticRecoveryEnabled = false;
63-
_conn = _connFactory.CreateConnection();
63+
_conn = await _connFactory.CreateConnectionAsync();
64+
Assert.IsNotType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
6465
_channel = await _conn.CreateChannelAsync();
6566

6667
_conn.ConnectionShutdown += (_, ea) =>
@@ -111,7 +112,8 @@ public async Task TestMultithreadFloodPublishingAsync()
111112
_connFactory.DispatchConsumersAsync = true;
112113
_connFactory.AutomaticRecoveryEnabled = false;
113114

114-
_conn = _connFactory.CreateConnection();
115+
_conn = await _connFactory.CreateConnectionAsync();
116+
Assert.IsNotType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
115117
_channel = await _conn.CreateChannelAsync();
116118

117119
string message = "Hello from test TestMultithreadFloodPublishing";

projects/Test/AsyncIntegration/TestMessageCountAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ protected override void SetUp()
5050
public async Task InitializeAsync()
5151
{
5252
_connFactory = CreateConnectionFactory();
53-
_conn = _connFactory.CreateConnection();
53+
_conn = await _connFactory.CreateConnectionAsync();
54+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5455
_channel = await _conn.CreateChannelAsync();
5556
}
5657

projects/Test/AsyncIntegration/TestPassiveDeclareAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ protected override void SetUp()
5252
public async Task InitializeAsync()
5353
{
5454
_connFactory = CreateConnectionFactory();
55-
_conn = _connFactory.CreateConnection();
55+
_conn = await _connFactory.CreateConnectionAsync();
56+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5657
_channel = await _conn.CreateChannelAsync();
5758
}
5859

projects/Test/AsyncIntegration/TestPublishSharedChannelAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,9 @@ public async Task MultiThreadPublishOnSharedChannel()
6767
var cf = CreateConnectionFactory();
6868
cf.AutomaticRecoveryEnabled = false;
6969

70-
using (IConnection conn = cf.CreateConnection())
70+
using (IConnection conn = await cf.CreateConnectionAsync())
7171
{
72+
Assert.IsNotType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(conn);
7273
conn.ConnectionShutdown += HandleConnectionShutdown;
7374

7475
using (IChannel channel = await conn.CreateChannelAsync())

projects/Test/AsyncIntegration/TestPublisherConfirmsAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ protected override void SetUp()
5959
public async Task InitializeAsync()
6060
{
6161
_connFactory = CreateConnectionFactory();
62-
_conn = _connFactory.CreateConnection();
62+
_conn = await _connFactory.CreateConnectionAsync();
63+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
6364
_channel = await _conn.CreateChannelAsync();
6465
}
6566

projects/Test/AsyncIntegration/TestQueueDeclareAsync.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ protected override void SetUp()
5353
public async Task InitializeAsync()
5454
{
5555
_connFactory = CreateConnectionFactory();
56-
_conn = _connFactory.CreateConnection();
56+
_conn = await _connFactory.CreateConnectionAsync();
57+
Assert.IsType<RabbitMQ.Client.Framing.Impl.AutorecoveringConnection>(_conn);
5758
_channel = await _conn.CreateChannelAsync();
5859
}
5960

0 commit comments

Comments
 (0)