Skip to content

Use unique queue and exchange names #1678

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions projects/Test/Integration/TestConnectionRecoveryWithoutSetup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ public async Task TestCreateChannelOnClosedAutorecoveringConnectionDoesNotHang()
[Fact]
public async Task TestTopologyRecoveryConsumerFilter()
{
const string exchange = "topology.recovery.exchange";
const string queueWithRecoveredConsumer = "topology.recovery.queue.1";
const string queueWithIgnoredConsumer = "topology.recovery.queue.2";
string exchange = GenerateExchangeName();
string queueWithRecoveredConsumer = GenerateQueueName();
string queueWithIgnoredConsumer = GenerateQueueName();
const string binding1 = "recovered.binding.1";
const string binding2 = "recovered.binding.2";

Expand Down Expand Up @@ -385,22 +385,25 @@ public async Task TestTopologyRecoveryConsumerFilter()
[Fact]
public async Task TestRecoveryWithTopologyDisabled()
{
string queueName = GenerateQueueName() + "-dotnet-client.test.recovery.q2";

using (AutorecoveringConnection conn = await CreateAutorecoveringConnectionWithTopologyRecoveryDisabledAsync())
{
using (IChannel ch = await conn.CreateChannelAsync())
{
try
{
string s = "dotnet-client.test.recovery.q2";
await ch.QueueDeleteAsync(s);
await ch.QueueDeclareAsync(queue: s, durable: false, exclusive: true, autoDelete: false, arguments: null);
await ch.QueueDeclareAsync(queue: s, passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null);
await ch.QueueDeleteAsync(queueName);
await ch.QueueDeclareAsync(queue: queueName,
durable: false, exclusive: true, autoDelete: false, arguments: null);
await ch.QueueDeclareAsync(queue: queueName,
passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null);

Assert.True(ch.IsOpen);
await CloseAndWaitForRecoveryAsync(conn);

Assert.True(ch.IsOpen);
await ch.QueueDeclareAsync(queue: s, passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null);
await ch.QueueDeclareAsync(queue: queueName, passive: true, durable: false, exclusive: true, autoDelete: false, arguments: null);

Assert.Fail("Expected an exception");
}
Expand Down
53 changes: 30 additions & 23 deletions projects/Test/Integration/TestConnectionTopologyRecovery.cs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ public async Task TestRecoverTopologyOnDisposedChannel()
[Fact]
public async Task TestTopologyRecoveryQueueFilter()
{
string queueToRecover = GenerateQueueName();
string queueToIgnore = GenerateQueueName() + "-filtered.queue";

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var filter = new TopologyRecoveryFilter
Expand All @@ -103,8 +106,6 @@ public async Task TestTopologyRecoveryQueueFilter()
};
IChannel ch = await conn.CreateChannelAsync();

string queueToRecover = "recovered.queue";
string queueToIgnore = "filtered.queue";
await ch.QueueDeclareAsync(queueToRecover, false, false, false);
await ch.QueueDeclareAsync(queueToIgnore, false, false, false);

Expand Down Expand Up @@ -138,6 +139,9 @@ public async Task TestTopologyRecoveryQueueFilter()
[Fact]
public async Task TestTopologyRecoveryExchangeFilter()
{
string exchangeToRecover = GenerateExchangeName();
string exchangeToIgnore = GenerateExchangeName() + "-filtered.exchange";

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var filter = new TopologyRecoveryFilter
Expand All @@ -154,8 +158,6 @@ public async Task TestTopologyRecoveryExchangeFilter()
IChannel ch = await conn.CreateChannelAsync();
try
{
string exchangeToRecover = "recovered.exchange";
string exchangeToIgnore = "filtered.exchange";
await ch.ExchangeDeclareAsync(exchangeToRecover, "topic", false, true);
await ch.ExchangeDeclareAsync(exchangeToIgnore, "direct", false, true);

Expand Down Expand Up @@ -186,6 +188,12 @@ public async Task TestTopologyRecoveryExchangeFilter()
[Fact]
public async Task TestTopologyRecoveryBindingFilter()
{
string exchange = GenerateExchangeName();
string queueWithRecoveredBinding = GenerateQueueName();
string queueWithIgnoredBinding = GenerateQueueName();
const string bindingToRecover = "recovered.binding";
const string bindingToIgnore = "filtered.binding";

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var filter = new TopologyRecoveryFilter
Expand All @@ -204,12 +212,6 @@ public async Task TestTopologyRecoveryBindingFilter()

try
{
string exchange = "topology.recovery.exchange";
string queueWithRecoveredBinding = "topology.recovery.queue.1";
string queueWithIgnoredBinding = "topology.recovery.queue.2";
string bindingToRecover = "recovered.binding";
string bindingToIgnore = "filtered.binding";

await ch.ExchangeDeclareAsync(exchange, "direct");
await ch.QueueDeclareAsync(queueWithRecoveredBinding, false, false, false);
await ch.QueueDeclareAsync(queueWithIgnoredBinding, false, false, false);
Expand All @@ -230,6 +232,9 @@ public async Task TestTopologyRecoveryBindingFilter()
}
finally
{
await ch.ExchangeDeleteAsync(exchange);
await ch.QueueDeleteAsync(queueWithRecoveredBinding);
await ch.QueueDeleteAsync(queueWithIgnoredBinding);
await ch.CloseAsync();
await conn.CloseAsync();
ch.Dispose();
Expand All @@ -240,9 +245,9 @@ public async Task TestTopologyRecoveryBindingFilter()
[Fact]
public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
{
const string exchange = "topology.recovery.exchange";
const string queue1 = "topology.recovery.queue.1";
const string queue2 = "topology.recovery.queue.2";
string exchange = GenerateExchangeName();
string queue1 = GenerateQueueName();
string queue2 = GenerateQueueName();
const string binding1 = "recovered.binding";
const string binding2 = "filtered.binding";

Expand Down Expand Up @@ -329,6 +334,9 @@ public async Task TestTopologyRecoveryDefaultFilterRecoversAllEntities()
[Fact]
public async Task TestTopologyRecoveryQueueExceptionHandler()
{
string queueToRecoverWithException = GenerateQueueName() + "-recovery.exception.queue";
string queueToRecoverSuccessfully = GenerateQueueName() + "-successfully.recovered.queue";

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var changedQueueArguments = new Dictionary<string, object>
Expand Down Expand Up @@ -363,8 +371,6 @@ await channel.QueueDeclareAsync(rq.Name, false, false, false,
};
IChannel ch = await conn.CreateChannelAsync();

string queueToRecoverWithException = "recovery.exception.queue";
string queueToRecoverSuccessfully = "successfully.recovered.queue";
await ch.QueueDeclareAsync(queueToRecoverWithException, false, false, false);
await ch.QueueDeclareAsync(queueToRecoverSuccessfully, false, false, false);

Expand Down Expand Up @@ -395,6 +401,9 @@ await _channel.QueueDeclareAsync(queueToRecoverWithException, false, false, fals
[Fact]
public async Task TestTopologyRecoveryExchangeExceptionHandler()
{
string exchangeToRecoverWithException = GenerateExchangeName() + "-recovery.exception.exchange";
string exchangeToRecoverSuccessfully = GenerateExchangeName() + "-successfully.recovered.exchange";

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var exceptionHandler = new TopologyRecoveryExceptionHandler
Expand Down Expand Up @@ -422,8 +431,6 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler()
return Task.CompletedTask;
};

string exchangeToRecoverWithException = "recovery.exception.exchange";
string exchangeToRecoverSuccessfully = "successfully.recovered.exchange";
IChannel ch = await conn.CreateChannelAsync();
await ch.ExchangeDeclareAsync(exchangeToRecoverWithException, "direct", false, false);
await ch.ExchangeDeclareAsync(exchangeToRecoverSuccessfully, "direct", false, false);
Expand Down Expand Up @@ -455,12 +462,12 @@ public async Task TestTopologyRecoveryExchangeExceptionHandler()
[Fact]
public async Task TestTopologyRecoveryBindingExceptionHandler()
{
var connectionRecoveryTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

const string exchange = "topology.recovery.exchange";
const string queueWithExceptionBinding = "recovery.exception.queue";
string exchange = GenerateExchangeName();
string queueWithExceptionBinding = GenerateQueueName();
const string bindingToRecoverWithException = "recovery.exception.binding";

var connectionRecoveryTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var exceptionHandler = new TopologyRecoveryExceptionHandler
{
BindingRecoveryExceptionCondition = (b, ex) =>
Expand Down Expand Up @@ -520,9 +527,9 @@ public async Task TestTopologyRecoveryBindingExceptionHandler()
[Fact]
public async Task TestTopologyRecoveryConsumerExceptionHandler()
{
var connectionRecoveryTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
string queueWithExceptionConsumer = GenerateQueueName() + "-recovery.exception.queue";

string queueWithExceptionConsumer = "recovery.exception.queue";
var connectionRecoveryTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

var exceptionHandler = new TopologyRecoveryExceptionHandler
{
Expand Down