Skip to content

Commit c58ff9e

Browse files
committed
* Check for unexpected callback exceptions.
1 parent ad1160b commit c58ff9e

15 files changed

+148
-41
lines changed

projects/RabbitMQ.Client/client/impl/RecordedBinding.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,11 @@ public Task RecoverAsync(IChannel channel)
8181

8282
public bool Equals(RecordedBinding other)
8383
{
84-
return _isQueueBinding == other._isQueueBinding && _destination == other._destination && _source == other._source &&
85-
_routingKey == other._routingKey && _arguments == other._arguments;
84+
return _isQueueBinding == other._isQueueBinding &&
85+
_destination == other._destination &&
86+
_source == other._source &&
87+
_routingKey == other._routingKey &&
88+
_arguments == other._arguments;
8689
}
8790

8891
public override bool Equals(object? obj)

projects/Test/Common/IntegrationFixture.cs

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public abstract class IntegrationFixture : IAsyncLifetime
5252
private static readonly bool s_isVerbose = false;
5353
private static int _connectionIdx = 0;
5454

55+
private Exception _connectionCallbackException;
56+
private Exception _channelCallbackException;
57+
5558
protected readonly RabbitMQCtl _rabbitMQCtl;
5659

5760
protected ConnectionFactory _connFactory;
@@ -77,7 +80,12 @@ public abstract class IntegrationFixture : IAsyncLifetime
7780

7881
static IntegrationFixture()
7982
{
83+
84+
#if NET6_0_OR_GREATER
85+
S_Random = Random.Shared;
86+
#else
8087
S_Random = new Random();
88+
#endif
8189
s_isRunningInCI = InitIsRunningInCI();
8290
s_isVerbose = InitIsVerbose();
8391

@@ -146,8 +154,10 @@ public virtual async Task InitializeAsync()
146154

147155
if (IsVerbose)
148156
{
149-
AddCallbackHandlers();
157+
AddCallbackShutdownHandlers();
150158
}
159+
160+
AddCallbackExceptionHandlers();
151161
}
152162

153163
if (_connFactory.AutomaticRecoveryEnabled)
@@ -181,42 +191,100 @@ public virtual async Task DisposeAsync()
181191
_channel = null;
182192
_conn = null;
183193
}
194+
195+
DisposeAssertions();
196+
}
197+
198+
protected virtual void DisposeAssertions()
199+
{
200+
if (_connectionCallbackException != null)
201+
{
202+
Assert.Fail($"unexpected connection callback exception: {_connectionCallbackException}");
203+
}
204+
205+
if (_channelCallbackException != null)
206+
{
207+
Assert.Fail($"unexpected channel callback exception: {_channelCallbackException}");
208+
}
184209
}
185210

186-
protected virtual void AddCallbackHandlers()
211+
protected void AddCallbackExceptionHandlers()
187212
{
188213
if (_conn != null)
189214
{
190215
_conn.CallbackException += (o, ea) =>
191216
{
192-
_output.WriteLine("{0} connection callback exception: {1}",
193-
_testDisplayName, ea.Exception);
217+
_connectionCallbackException = ea.Exception;
218+
219+
if (IsVerbose)
220+
{
221+
try
222+
{
223+
_output.WriteLine("{0} connection callback exception: {1}",
224+
_testDisplayName, ea.Exception);
225+
}
226+
catch (InvalidOperationException)
227+
{
228+
}
229+
}
194230
};
231+
}
195232

233+
if (_channel != null)
234+
{
235+
_channel.CallbackException += (o, ea) =>
236+
{
237+
_channelCallbackException = ea.Exception;
238+
239+
if (IsVerbose)
240+
{
241+
try
242+
{
243+
_output.WriteLine("{0} channel callback exception: {1}",
244+
_testDisplayName, ea.Exception);
245+
}
246+
catch (InvalidOperationException)
247+
{
248+
}
249+
}
250+
};
251+
}
252+
}
253+
254+
protected void AddCallbackShutdownHandlers()
255+
{
256+
if (_conn != null)
257+
{
196258
_conn.ConnectionShutdown += (o, ea) =>
197259
{
198260
HandleConnectionShutdown(_conn, ea, (args) =>
199261
{
200-
_output.WriteLine("{0} connection shutdown, args: {1}",
201-
_testDisplayName, args);
262+
try
263+
{
264+
_output.WriteLine("{0} connection shutdown, args: {1}",
265+
_testDisplayName, args);
266+
}
267+
catch (InvalidOperationException)
268+
{
269+
}
202270
});
203271
};
204272
}
205273

206274
if (_channel != null)
207275
{
208-
_channel.CallbackException += (o, ea) =>
209-
{
210-
_output.WriteLine("{0} channel callback exception: {1}",
211-
_testDisplayName, ea.Exception);
212-
};
213-
214276
_channel.ChannelShutdown += (o, ea) =>
215277
{
216278
HandleChannelShutdown(_channel, ea, (args) =>
217279
{
218-
_output.WriteLine("{0} channel shutdown, args: {1}",
219-
_testDisplayName, args);
280+
try
281+
{
282+
_output.WriteLine("{0} channel shutdown, args: {1}",
283+
_testDisplayName, args);
284+
}
285+
catch (InvalidOperationException)
286+
{
287+
}
220288
});
221289
};
222290
}
@@ -405,6 +473,11 @@ protected static Task AssertRanToCompletion(IEnumerable<Task> tasks)
405473
return DoAssertRanToCompletion(tasks);
406474
}
407475

476+
internal static void AssertRecordedQueues(AutorecoveringConnection c, int n)
477+
{
478+
Assert.Equal(n, c.RecordedQueuesCount);
479+
}
480+
408481
protected static Task WaitAsync(TaskCompletionSource<bool> tcs, string desc)
409482
{
410483
return WaitAsync(tcs, WaitSpan, desc);
@@ -524,11 +597,7 @@ protected static string GetUniqueString(ushort length)
524597
protected static byte[] GetRandomBody(ushort size = 1024)
525598
{
526599
var body = new byte[size];
527-
#if NET6_0_OR_GREATER
528-
Random.Shared.NextBytes(body);
529-
#else
530600
S_Random.NextBytes(body);
531-
#endif
532601
return body;
533602
}
534603

@@ -543,7 +612,7 @@ protected static TaskCompletionSource<bool> PrepareForRecovery(IConnection conn)
543612
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
544613

545614
AutorecoveringConnection aconn = conn as AutorecoveringConnection;
546-
aconn.RecoverySucceeded += (source, ea) => tcs.SetResult(true);
615+
aconn.RecoverySucceeded += (source, ea) => tcs.TrySetResult(true);
547616

548617
return tcs;
549618
}

projects/Test/Common/TestConnectionRecoveryBase.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,6 @@ internal void AssertRecordedExchanges(AutorecoveringConnection c, int n)
108108
Assert.Equal(n, c.RecordedExchangesCount);
109109
}
110110

111-
internal void AssertRecordedQueues(AutorecoveringConnection c, int n)
112-
{
113-
Assert.Equal(n, c.RecordedQueuesCount);
114-
}
115-
116111
internal Task<AutorecoveringConnection> CreateAutorecoveringConnectionAsync()
117112
{
118113
return CreateAutorecoveringConnectionAsync(RecoveryInterval);

projects/Test/Integration/ConnectionRecovery/TestConsumerRecovery.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ namespace Test.Integration.ConnectionRecovery
4040
{
4141
public class TestConsumerRecovery : TestConnectionRecoveryBase
4242
{
43-
public TestConsumerRecovery(ITestOutputHelper output) : base(output)
43+
public TestConsumerRecovery(ITestOutputHelper output)
44+
: base(output, dispatchConsumersAsync: true)
4445
{
4546
}
4647

@@ -57,7 +58,7 @@ public async Task TestConsumerRecoveryWithManyConsumers()
5758
}
5859

5960
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
60-
((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.SetResult(true);
61+
((AutorecoveringConnection)_conn).ConsumerTagChangeAfterRecovery += (prev, current) => tcs.TrySetResult(true);
6162

6263
await CloseAndWaitForRecoveryAsync();
6364
await WaitAsync(tcs, "consumer tag change after recovery");

projects/Test/Integration/ConnectionRecovery/TestDeclaration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
119119
{
120120
string q = Guid.NewGuid().ToString();
121121
await _channel.QueueDeclareAsync(q, false, false, true);
122-
var dummy = new AsyncEventingBasicConsumer(_channel);
122+
var dummy = new EventingBasicConsumer(_channel);
123123
string tag = await _channel.BasicConsumeAsync(q, true, dummy);
124124
await _channel.BasicCancelAsync(tag);
125125
}

projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public async Task TestRecoveryEventHandlersOnChannel()
8181
public async Task TestRecoveringConsumerHandlerOnConnection(int iterations)
8282
{
8383
string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName;
84-
var cons = new AsyncEventingBasicConsumer(_channel);
84+
var cons = new EventingBasicConsumer(_channel);
8585
await _channel.BasicConsumeAsync(q, true, cons);
8686

8787
int counter = 0;
@@ -100,7 +100,7 @@ public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePas
100100
{
101101
var myArgs = new Dictionary<string, object> { { "first-argument", "some-value" } };
102102
string q = (await _channel.QueueDeclareAsync(GenerateQueueName(), false, false, false)).QueueName;
103-
var cons = new AsyncEventingBasicConsumer(_channel);
103+
var cons = new EventingBasicConsumer(_channel);
104104
string expectedCTag = await _channel.BasicConsumeAsync(cons, q, arguments: myArgs);
105105

106106
bool ctagMatches = false;

projects/Test/Integration/ConnectionRecovery/TestRecoveryWithDeletedEntities.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public async Task TestThatDeletedQueueBindingsDontReappearOnRecovery()
100100
string ex_amq_fanout = "amq.fanout";
101101
string ex_test = GenerateExchangeName();
102102

103-
await _channel.ExchangeDeclareAsync(ex_test, "fanout");
103+
await _channel.ExchangeDeclareAsync(ex_test, ExchangeType.Fanout);
104104
await _channel.ExchangeBindAsync(destination: ex_amq_fanout, source: ex_test, routingKey: "");
105105
await _channel.QueueBindAsync(q, ex_amq_fanout, "");
106106
await _channel.QueueUnbindAsync(q, ex_amq_fanout, "");

projects/Test/Integration/ConnectionRecovery/TestRpcAfterRecovery.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ namespace Test.Integration.ConnectionRecovery
4141
{
4242
public class TestRpcAfterRecovery : TestConnectionRecoveryBase
4343
{
44-
public TestRpcAfterRecovery(ITestOutputHelper output) : base(output)
44+
public TestRpcAfterRecovery(ITestOutputHelper output)
45+
: base(output, dispatchConsumersAsync: true)
4546
{
4647
}
4748

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -518,5 +518,20 @@ public async Task NonAsyncConsumerShouldThrowInvalidOperationException()
518518
}
519519
Assert.True(sawException, "did not see expected InvalidOperationException");
520520
}
521+
522+
[Fact]
523+
public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
524+
{
525+
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
526+
for (int i = 0; i < 1000; i++)
527+
{
528+
string q = Guid.NewGuid().ToString();
529+
await _channel.QueueDeclareAsync(q, false, false, true);
530+
var dummy = new AsyncEventingBasicConsumer(_channel);
531+
string tag = await _channel.BasicConsumeAsync(q, true, dummy);
532+
await _channel.BasicCancelAsync(tag);
533+
}
534+
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
535+
}
521536
}
522537
}

projects/Test/Integration/TestAsyncConsumerExceptions.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,20 @@ public class TestAsyncConsumerExceptions : IntegrationFixture
4444
private static readonly Exception TestException = new Exception("oops");
4545

4646
public TestAsyncConsumerExceptions(ITestOutputHelper output)
47-
: base(output, dispatchConsumersAsync: true, consumerDispatchConcurrency: 1)
47+
: base(output,
48+
dispatchConsumersAsync: true,
49+
consumerDispatchConcurrency: 1)
4850
{
4951
}
5052

53+
protected override void DisposeAssertions()
54+
{
55+
/*
56+
* Note: don't do anything since these tests expect callback
57+
* exceptions
58+
*/
59+
}
60+
5161
[Fact]
5262
public Task TestCancelNotificationExceptionHandling()
5363
{

projects/Test/Integration/TestConnectionShutdown.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,7 @@ public async Task TestDisposedWithSocketClosedOutOfBand()
105105
_conn.Dispose();
106106
_conn = null;
107107

108-
TimeSpan waitSpan = TimeSpan.FromSeconds(3);
109-
if (IsRunningInCI)
110-
{
111-
waitSpan = TimeSpan.FromSeconds(10);
112-
}
108+
TimeSpan waitSpan = TimeSpan.FromSeconds(10);
113109
await WaitAsync(tcs, waitSpan, "channel shutdown");
114110
}
115111

projects/Test/Integration/TestConsumerCancelNotify.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public async Task TestCorrectConsumerTag()
7676
consumer.ConsumerCancelled += (sender, args) =>
7777
{
7878
notifiedConsumerTag = args.ConsumerTags.First();
79-
_tcs.SetResult(true);
79+
_tcs.TrySetResult(true);
8080
};
8181

8282
await _channel.QueueDeleteAsync(q1);

projects/Test/Integration/TestConsumerExceptions.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ namespace Test.Integration
3939
{
4040
public class TestConsumerExceptions : IntegrationFixture
4141
{
42+
protected override void DisposeAssertions()
43+
{
44+
/*
45+
* Note: don't do anything since these tests expect callback
46+
* exceptions
47+
*/
48+
}
49+
4250
private class ConsumerFailingOnDelivery : DefaultBasicConsumer
4351
{
4452
public ConsumerFailingOnDelivery(IChannel channel) : base(channel)

projects/Test/Integration/TestMainLoop.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,14 @@ public TestMainLoop(ITestOutputHelper output) : base(output)
4444
{
4545
}
4646

47+
protected override void DisposeAssertions()
48+
{
49+
/*
50+
* Note: don't do anything since these tests expect callback
51+
* exceptions
52+
*/
53+
}
54+
4755
private sealed class FaultyConsumer : DefaultBasicConsumer
4856
{
4957
public FaultyConsumer(IChannel channel) : base(channel) { }

projects/Test/Integration/TestQueueDeclare.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ async Task f()
9797
// sleep for a random amount of time to increase the chances
9898
// of thread interleaving. MK.
9999
await Task.Delay(S_Random.Next(5, 50));
100-
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty, false, false, false);
100+
QueueDeclareOk r = await _channel.QueueDeclareAsync(queue: string.Empty,
101+
durable: false, exclusive: true, autoDelete: false);
101102
string queueName = r.QueueName;
102103
await _channel.QueueBindAsync(queue: queueName, exchange: "amq.fanout", routingKey: queueName);
103104
queueNames.Add(queueName);

0 commit comments

Comments
 (0)