Skip to content

Commit 79659d9

Browse files
committed
Separated out connection recovery tests
This will allow these slow tests to run in parallel (cherry picked from commit d3d7195)
1 parent bf86447 commit 79659d9

11 files changed

+1164
-741
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Threading.Tasks;
34+
using RabbitMQ.Client;
35+
using RabbitMQ.Client.Impl;
36+
using Xunit;
37+
using Xunit.Abstractions;
38+
using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk;
39+
40+
namespace Test.Integration.ConnectionRecovery
41+
{
42+
public class TestBasicAckAndBasicNack : TestConnectionRecoveryBase
43+
{
44+
private readonly string _queueName;
45+
46+
public TestBasicAckAndBasicNack(ITestOutputHelper output) : base(output)
47+
{
48+
_queueName = $"{nameof(TestBasicAckAndBasicNack)}-{Guid.NewGuid()}";
49+
}
50+
51+
public override async Task DisposeAsync()
52+
{
53+
ConnectionFactory cf = CreateConnectionFactory();
54+
cf.ClientProvidedName += "-TearDown";
55+
using (IConnection conn = await cf.CreateConnectionAsync())
56+
{
57+
using (IChannel ch = await conn.CreateChannelAsync())
58+
{
59+
await ch.QueueDeleteAsync(_queueName);
60+
await ch.CloseAsync();
61+
}
62+
await conn.CloseAsync();
63+
}
64+
65+
await base.DisposeAsync();
66+
}
67+
68+
[Fact]
69+
public async Task TestBasicAckAfterChannelRecovery()
70+
{
71+
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
72+
var cons = new AckingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs);
73+
74+
QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false);
75+
string queueName = q.QueueName;
76+
Assert.Equal(queueName, _queueName);
77+
78+
await _channel.BasicQosAsync(0, 1, false);
79+
await _channel.BasicConsumeAsync(queueName, false, cons);
80+
81+
TaskCompletionSource<bool> sl = PrepareForShutdown(_conn);
82+
TaskCompletionSource<bool> rl = PrepareForRecovery(_conn);
83+
84+
await PublishMessagesWhileClosingConnAsync(queueName);
85+
86+
await WaitAsync(sl, "connection shutdown");
87+
await WaitAsync(rl, "connection recovery");
88+
await WaitAsync(allMessagesSeenTcs, "all messages seen");
89+
}
90+
91+
[Fact]
92+
public async Task TestBasicNackAfterChannelRecovery()
93+
{
94+
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
95+
var cons = new NackingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs);
96+
97+
QueueDeclareOk q = await _channel.QueueDeclareAsync(_queueName, false, false, false);
98+
string queueName = q.QueueName;
99+
Assert.Equal(queueName, _queueName);
100+
101+
await _channel.BasicQosAsync(0, 1, false);
102+
await _channel.BasicConsumeAsync(queueName, false, cons);
103+
104+
TaskCompletionSource<bool> sl = PrepareForShutdown(_conn);
105+
TaskCompletionSource<bool> rl = PrepareForRecovery(_conn);
106+
107+
await PublishMessagesWhileClosingConnAsync(queueName);
108+
109+
await WaitAsync(sl, "connection shutdown");
110+
await WaitAsync(rl, "connection recovery");
111+
await WaitAsync(allMessagesSeenTcs, "all messages seen");
112+
}
113+
114+
[Fact]
115+
public async Task TestBasicRejectAfterChannelRecovery()
116+
{
117+
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
118+
var cons = new RejectingBasicConsumer(_channel, TotalMessageCount, allMessagesSeenTcs);
119+
120+
string queueName = (await _channel.QueueDeclareAsync(_queueName, false, false, false)).QueueName;
121+
Assert.Equal(queueName, _queueName);
122+
123+
await _channel.BasicQosAsync(0, 1, false);
124+
await _channel.BasicConsumeAsync(queueName, false, cons);
125+
126+
TaskCompletionSource<bool> sl = PrepareForShutdown(_conn);
127+
TaskCompletionSource<bool> rl = PrepareForRecovery(_conn);
128+
129+
await PublishMessagesWhileClosingConnAsync(queueName);
130+
131+
await WaitAsync(sl, "connection shutdown");
132+
await WaitAsync(rl, "connection recovery");
133+
await WaitAsync(allMessagesSeenTcs, "all messages seen");
134+
}
135+
136+
[Fact]
137+
public async Task TestBasicAckAfterBasicGetAndChannelRecovery()
138+
{
139+
string q = GenerateQueueName();
140+
await _channel.QueueDeclareAsync(q, false, false, false);
141+
// create an offset
142+
await _channel.BasicPublishAsync("", q, _messageBody);
143+
await Task.Delay(50);
144+
BasicGetResult g = await _channel.BasicGetAsync(q, false);
145+
await CloseAndWaitForRecoveryAsync();
146+
Assert.True(_conn.IsOpen);
147+
Assert.True(_channel.IsOpen);
148+
// ack the message after recovery - this should be out of range and ignored
149+
await _channel.BasicAckAsync(g.DeliveryTag, false);
150+
// do a sync operation to 'check' there is no channel exception
151+
await _channel.BasicGetAsync(q, false);
152+
}
153+
154+
[Fact]
155+
public async Task TestBasicAckEventHandlerRecovery()
156+
{
157+
await _channel.ConfirmSelectAsync();
158+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
159+
((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true);
160+
((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true);
161+
162+
await CloseAndWaitForRecoveryAsync();
163+
await CloseAndWaitForRecoveryAsync();
164+
Assert.True(_channel.IsOpen);
165+
166+
await WithTemporaryNonExclusiveQueueAsync(_channel, (ch, q) =>
167+
{
168+
return ch.BasicPublishAsync("", q, _messageBody).AsTask();
169+
});
170+
171+
await WaitAsync(tcs, "basic acks/nacks");
172+
}
173+
}
174+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System.Threading.Tasks;
33+
using RabbitMQ.Client;
34+
using Xunit;
35+
using Xunit.Abstractions;
36+
37+
namespace Test.Integration.ConnectionRecovery
38+
{
39+
public class TestBasicConnectionRecovery : TestConnectionRecoveryBase
40+
{
41+
public TestBasicConnectionRecovery(ITestOutputHelper output) : base(output)
42+
{
43+
}
44+
45+
[Fact]
46+
public async Task TestBasicConnectionRecoveryTest()
47+
{
48+
Assert.True(_conn.IsOpen);
49+
await CloseAndWaitForRecoveryAsync();
50+
Assert.True(_conn.IsOpen);
51+
}
52+
53+
[Fact]
54+
public async Task TestBasicChannelRecovery()
55+
{
56+
Assert.True(_channel.IsOpen);
57+
await CloseAndWaitForRecoveryAsync();
58+
Assert.True(_channel.IsOpen);
59+
}
60+
61+
[Fact]
62+
public Task TestClientNamedQueueRecovery()
63+
{
64+
string s = "dotnet-client.test.recovery.q1";
65+
return WithTemporaryNonExclusiveQueueAsync(_channel, async (m, q) =>
66+
{
67+
await CloseAndWaitForRecoveryAsync();
68+
await AssertQueueRecoveryAsync(m, q, false);
69+
await _channel.QueueDeleteAsync(q);
70+
}, s);
71+
}
72+
73+
[Fact]
74+
public Task TestClientNamedQueueRecoveryNoWait()
75+
{
76+
string s = "dotnet-client.test.recovery.q1-nowait";
77+
return WithTemporaryExclusiveQueueNoWaitAsync(_channel, async (ch, q) =>
78+
{
79+
await CloseAndWaitForRecoveryAsync();
80+
await AssertExclusiveQueueRecoveryAsync(ch, q);
81+
}, s);
82+
}
83+
}
84+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2020 VMware, Inc.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System;
33+
using System.Threading;
34+
using System.Threading.Tasks;
35+
using RabbitMQ.Client;
36+
using RabbitMQ.Client.Events;
37+
using Xunit;
38+
using Xunit.Abstractions;
39+
using QueueDeclareOk = RabbitMQ.Client.QueueDeclareOk;
40+
41+
namespace Test.Integration.ConnectionRecovery
42+
{
43+
public class TestConnectionRecovery : TestConnectionRecoveryBase
44+
{
45+
public TestConnectionRecovery(ITestOutputHelper output) : base(output)
46+
{
47+
}
48+
49+
[Fact]
50+
public async Task TestBindingRecovery_GH1035()
51+
{
52+
const string routingKey = "unused";
53+
byte[] body = GetRandomBody();
54+
55+
var receivedMessageSemaphore = new SemaphoreSlim(0, 1);
56+
57+
Task MessageReceived(object sender, BasicDeliverEventArgs e)
58+
{
59+
receivedMessageSemaphore.Release();
60+
return Task.CompletedTask;
61+
}
62+
63+
string exchangeName = $"ex-gh-1035-{Guid.NewGuid()}";
64+
string queueName = $"q-gh-1035-{Guid.NewGuid()}";
65+
66+
await _channel.ExchangeDeclareAsync(exchange: exchangeName,
67+
type: "fanout", durable: false, autoDelete: true,
68+
arguments: null);
69+
70+
QueueDeclareOk q0 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true);
71+
Assert.Equal(queueName, q0);
72+
73+
await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey);
74+
75+
await _channel.CloseAsync();
76+
_channel.Dispose();
77+
_channel = null;
78+
79+
_channel = await _conn.CreateChannelAsync();
80+
81+
await _channel.ExchangeDeclareAsync(exchange: exchangeName,
82+
type: "fanout", durable: false, autoDelete: true,
83+
arguments: null);
84+
85+
QueueDeclareOk q1 = await _channel.QueueDeclareAsync(queue: queueName, exclusive: true);
86+
Assert.Equal(queueName, q1.QueueName);
87+
88+
await _channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey);
89+
90+
var c = new AsyncEventingBasicConsumer(_channel);
91+
c.Received += MessageReceived;
92+
await _channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: c);
93+
94+
using (IChannel pubCh = await _conn.CreateChannelAsync())
95+
{
96+
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
97+
await pubCh.CloseAsync();
98+
}
99+
100+
Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan));
101+
102+
await CloseAndWaitForRecoveryAsync();
103+
104+
using (IChannel pubCh = await _conn.CreateChannelAsync())
105+
{
106+
await pubCh.BasicPublishAsync(exchange: exchangeName, routingKey: "unused", body: body);
107+
await pubCh.CloseAsync();
108+
}
109+
110+
Assert.True(await receivedMessageSemaphore.WaitAsync(WaitSpan));
111+
}
112+
}
113+
}

0 commit comments

Comments
 (0)