Skip to content

Commit b3e02b9

Browse files
committed
Add test that creates IChannel within async consumer callback
Fixes #650
1 parent 2a48e6c commit b3e02b9

File tree

1 file changed

+67
-0
lines changed

1 file changed

+67
-0
lines changed

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,73 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
564564
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
565565
}
566566

567+
[Fact]
568+
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
569+
{
570+
string exchangeName = GenerateExchangeName();
571+
string queue1Name = GenerateQueueName();
572+
string queue2Name = GenerateQueueName();
573+
574+
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
575+
using var cts = new CancellationTokenSource(WaitSpan);
576+
using CancellationTokenRegistration ctr = cts.Token.Register(() =>
577+
{
578+
tcs.SetCanceled();
579+
});
580+
581+
_conn.ConnectionShutdown += (o, ea) =>
582+
{
583+
HandleConnectionShutdown(_conn, ea, (args) =>
584+
{
585+
MaybeSetException(ea, tcs);
586+
});
587+
};
588+
589+
_channel.ChannelShutdown += (o, ea) =>
590+
{
591+
HandleChannelShutdown(_channel, ea, (args) =>
592+
{
593+
MaybeSetException(ea, tcs);
594+
});
595+
};
596+
597+
// queue1 -> produce click to queue2
598+
// click -> exchange
599+
// queue2 -> consume click from queue1
600+
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
601+
await _channel.QueueDeclareAsync(queue1Name);
602+
await _channel.QueueBindAsync(queue1Name, exchangeName, queue1Name);
603+
await _channel.QueueDeclareAsync(queue2Name);
604+
await _channel.QueueBindAsync(queue2Name, exchangeName, queue2Name);
605+
606+
var consumer1 = new AsyncEventingBasicConsumer(_channel);
607+
consumer1.Received += async (sender, args) =>
608+
{
609+
using (IChannel innerChannel = await _conn.CreateChannelAsync())
610+
{
611+
await innerChannel.ConfirmSelectAsync();
612+
await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true);
613+
await innerChannel.WaitForConfirmsOrDieAsync();
614+
await innerChannel.CloseAsync();
615+
}
616+
};
617+
await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1);
618+
619+
var consumer2 = new AsyncEventingBasicConsumer(_channel);
620+
consumer2.Received += async (sender, args) =>
621+
{
622+
tcs.TrySetResult(true);
623+
await Task.Yield();
624+
};
625+
await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2);
626+
627+
await _channel.ConfirmSelectAsync();
628+
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
629+
await _channel.WaitForConfirmsOrDieAsync();
630+
631+
Assert.True(await tcs.Task);
632+
}
633+
567634
private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
568635
{
569636
foreach (TaskCompletionSource<bool> tcs in tcsAry)

0 commit comments

Comments
 (0)