Skip to content

Commit 9ff1455

Browse files
committed
* Fix detection of unexpected exception initiator.
* Ensure flood publishing test does not use exclusive queue.
1 parent b21af89 commit 9ff1455

File tree

5 files changed

+70
-82
lines changed

5 files changed

+70
-82
lines changed

projects/Test/Common/IntegrationFixture.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -538,7 +538,7 @@ protected ConnectionFactory CreateConnectionFactory()
538538

539539
protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)
540540
{
541-
if (args.Initiator == ShutdownInitiator.Peer)
541+
if (args.Initiator != ShutdownInitiator.Application)
542542
{
543543
IConnection conn = (IConnection)sender;
544544
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
@@ -547,7 +547,7 @@ protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args)
547547

548548
protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
549549
{
550-
if (args.Initiator == ShutdownInitiator.Peer)
550+
if (args.Initiator != ShutdownInitiator.Application)
551551
{
552552
_output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}");
553553
}
@@ -556,7 +556,7 @@ protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args
556556

557557
protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)
558558
{
559-
if (args.Initiator == ShutdownInitiator.Peer)
559+
if (args.Initiator != ShutdownInitiator.Application)
560560
{
561561
IChannel ch = (IChannel)sender;
562562
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
@@ -565,7 +565,7 @@ protected void HandleChannelShutdown(object sender, ShutdownEventArgs args)
565565

566566
protected void HandleChannelShutdown(IChannel ch, ShutdownEventArgs args, Action<ShutdownEventArgs> a)
567567
{
568-
if (args.Initiator == ShutdownInitiator.Peer)
568+
if (args.Initiator != ShutdownInitiator.Application)
569569
{
570570
_output.WriteLine($"{_testDisplayName} channel {ch.ChannelNumber} shut down: {args}");
571571
}

projects/Test/Integration/TestAsyncConsumer.cs

Lines changed: 23 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -80,21 +80,15 @@ public async Task TestBasicRoundtripConcurrent()
8080
{
8181
HandleConnectionShutdown(_conn, ea, (args) =>
8282
{
83-
if (args.Initiator == ShutdownInitiator.Peer)
84-
{
85-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
86-
}
83+
MaybeSetException(args, publish1SyncSource, publish2SyncSource);
8784
});
8885
};
8986

9087
_channel.ChannelShutdown += (o, ea) =>
9188
{
9289
HandleChannelShutdown(_channel, ea, (args) =>
9390
{
94-
if (args.Initiator == ShutdownInitiator.Peer)
95-
{
96-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
97-
}
91+
MaybeSetException(args, publish1SyncSource, publish2SyncSource);
9892
});
9993
};
10094

@@ -164,21 +158,15 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
164158
{
165159
HandleConnectionShutdown(_conn, ea, (args) =>
166160
{
167-
if (args.Initiator == ShutdownInitiator.Peer)
168-
{
169-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
170-
}
161+
MaybeSetException(args, publish1SyncSource, publish2SyncSource);
171162
});
172163
};
173164

174165
_channel.ChannelShutdown += (o, ea) =>
175166
{
176167
HandleChannelShutdown(_channel, ea, (args) =>
177168
{
178-
if (args.Initiator == ShutdownInitiator.Peer)
179-
{
180-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
181-
}
169+
MaybeSetException(args, publish1SyncSource, publish2SyncSource);
182170
});
183171
};
184172

@@ -193,10 +181,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
193181
{
194182
HandleConnectionShutdown(publishConn, ea, (args) =>
195183
{
196-
if (args.Initiator == ShutdownInitiator.Peer)
197-
{
198-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
199-
}
184+
MaybeSetException(args, publish1SyncSource, publish2SyncSource);
200185
});
201186
};
202187
using (IChannel publishChannel = await publishConn.CreateChannelAsync())
@@ -205,10 +190,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
205190
{
206191
HandleChannelShutdown(publishChannel, ea, (args) =>
207192
{
208-
if (args.Initiator == ShutdownInitiator.Peer)
209-
{
210-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
211-
}
193+
MaybeSetException(args, publish1SyncSource, publish2SyncSource);
212194
});
213195
};
214196
await publishChannel.ConfirmSelectAsync();
@@ -235,10 +217,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
235217
{
236218
HandleConnectionShutdown(consumeConn, ea, (args) =>
237219
{
238-
if (args.Initiator == ShutdownInitiator.Peer)
239-
{
240-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
241-
}
220+
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
242221
});
243222
};
244223
using (IChannel consumeChannel = await consumeConn.CreateChannelAsync())
@@ -247,10 +226,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages()
247226
{
248227
HandleChannelShutdown(consumeChannel, ea, (args) =>
249228
{
250-
if (args.Initiator == ShutdownInitiator.Peer)
251-
{
252-
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
253-
}
229+
MaybeSetException(ea, publish1SyncSource, publish2SyncSource);
254230
});
255231
};
256232

@@ -330,18 +306,15 @@ public async Task TestBasicRejectAsync()
330306
{
331307
HandleConnectionShutdown(_conn, ea, (args) =>
332308
{
333-
if (args.Initiator == ShutdownInitiator.Peer)
334-
{
335-
MaybeSetException(ea, publishSyncSource);
336-
}
309+
MaybeSetException(args, publishSyncSource);
337310
});
338311
};
339312

340313
_channel.ChannelShutdown += (o, ea) =>
341314
{
342315
HandleChannelShutdown(_channel, ea, (args) =>
343316
{
344-
MaybeSetException(ea, publishSyncSource);
317+
MaybeSetException(args, publishSyncSource);
345318
});
346319
};
347320

@@ -427,21 +400,15 @@ public async Task TestBasicAckAsync()
427400
{
428401
HandleConnectionShutdown(_conn, ea, (args) =>
429402
{
430-
if (args.Initiator == ShutdownInitiator.Peer)
431-
{
432-
MaybeSetException(ea, publishSyncSource);
433-
}
403+
MaybeSetException(args, publishSyncSource);
434404
});
435405
};
436406

437407
_channel.ChannelShutdown += (o, ea) =>
438408
{
439409
HandleChannelShutdown(_channel, ea, (args) =>
440410
{
441-
if (args.Initiator == ShutdownInitiator.Peer)
442-
{
443-
MaybeSetException(ea, publishSyncSource);
444-
}
411+
MaybeSetException(args, publishSyncSource);
445412
});
446413
};
447414

@@ -495,21 +462,15 @@ public async Task TestBasicNackAsync()
495462
{
496463
HandleConnectionShutdown(_conn, ea, (args) =>
497464
{
498-
if (args.Initiator == ShutdownInitiator.Peer)
499-
{
500-
MaybeSetException(ea, publishSyncSource);
501-
}
465+
MaybeSetException(ea, publishSyncSource);
502466
});
503467
};
504468

505469
_channel.ChannelShutdown += (o, ea) =>
506470
{
507471
HandleChannelShutdown(_channel, ea, (args) =>
508472
{
509-
if (args.Initiator == ShutdownInitiator.Peer)
510-
{
511-
MaybeSetException(ea, publishSyncSource);
512-
}
473+
MaybeSetException(ea, publishSyncSource);
513474
});
514475
};
515476

@@ -611,19 +572,22 @@ private static void SetException(Exception ex, params TaskCompletionSource<bool>
611572
}
612573
}
613574

614-
private static void MaybeSetException(ShutdownEventArgs ea, params TaskCompletionSource<bool>[] tcsAry)
575+
private static void MaybeSetException(ShutdownEventArgs args, params TaskCompletionSource<bool>[] tcsAry)
615576
{
616-
foreach (TaskCompletionSource<bool> tcs in tcsAry)
577+
if (args.Initiator != ShutdownInitiator.Application)
617578
{
618-
MaybeSetException(ea, tcs);
579+
foreach (TaskCompletionSource<bool> tcs in tcsAry)
580+
{
581+
MaybeSetException(args, tcs);
582+
}
619583
}
620584
}
621585

622-
private static void MaybeSetException(ShutdownEventArgs ea, TaskCompletionSource<bool> tcs)
586+
private static void MaybeSetException(ShutdownEventArgs args, TaskCompletionSource<bool> tcs)
623587
{
624-
if (ea.Initiator == ShutdownInitiator.Peer)
588+
if (args.Initiator != ShutdownInitiator.Application)
625589
{
626-
Exception ex = ea.Exception ?? new Exception(ea.ReplyText);
590+
Exception ex = args.Exception ?? new Exception(args.ReplyText);
627591
tcs.TrySetException(ex);
628592
}
629593
}

projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,9 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
104104
{
105105
HandleChannelShutdown(ch, ea, (args) =>
106106
{
107-
if (args.Initiator == ShutdownInitiator.Peer)
107+
if (args.Initiator != ShutdownInitiator.Application)
108108
{
109-
tcs.TrySetResult(false);
109+
tcs.TrySetException(args.Exception);
110110
}
111111
});
112112
};

projects/Test/Integration/TestFloodPublishing.cs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public async Task TestUnthrottledFloodPublishing()
7070
{
7171
HandleConnectionShutdown(_conn, ea, (args) =>
7272
{
73-
if (args.Initiator == ShutdownInitiator.Peer)
73+
if (args.Initiator != ShutdownInitiator.Application)
7474
{
7575
sawUnexpectedShutdown = true;
7676
}
@@ -81,7 +81,7 @@ public async Task TestUnthrottledFloodPublishing()
8181
{
8282
HandleChannelShutdown(_channel, ea, (args) =>
8383
{
84-
if (args.Initiator == ShutdownInitiator.Peer)
84+
if (args.Initiator != ShutdownInitiator.Application)
8585
{
8686
sawUnexpectedShutdown = true;
8787
}
@@ -130,16 +130,16 @@ public async Task TestMultithreadFloodPublishing()
130130
int publishCount = 4096;
131131
int receivedCount = 0;
132132

133-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
133+
var allMessagesSeenTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
134134

135135
_conn.ConnectionShutdown += (o, ea) =>
136136
{
137137
HandleConnectionShutdown(_conn, ea, (args) =>
138138
{
139-
if (args.Initiator == ShutdownInitiator.Peer)
139+
if (args.Initiator != ShutdownInitiator.Application)
140140
{
141141
receivedCount = -1;
142-
tcs.SetResult(false);
142+
allMessagesSeenTcs.TrySetException(args.Exception);
143143
}
144144
});
145145
};
@@ -148,23 +148,35 @@ public async Task TestMultithreadFloodPublishing()
148148
{
149149
HandleChannelShutdown(_channel, ea, (args) =>
150150
{
151-
if (args.Initiator == ShutdownInitiator.Peer)
151+
if (args.Initiator != ShutdownInitiator.Application)
152152
{
153153
receivedCount = -1;
154-
tcs.SetResult(false);
154+
allMessagesSeenTcs.TrySetException(args.Exception);
155155
}
156156
});
157157
};
158158

159159
QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: string.Empty,
160-
passive: false, durable: false, exclusive: true, autoDelete: false, arguments: null);
160+
passive: false, durable: false, exclusive: false, autoDelete: true, arguments: null);
161161
string queueName = q.QueueName;
162162

163163
Task pub = Task.Run(async () =>
164164
{
165165
bool stop = false;
166166
using (IConnection publishConnection = await _connFactory.CreateConnectionAsync())
167167
{
168+
publishConnection.ConnectionShutdown += (o, ea) =>
169+
{
170+
HandleConnectionShutdown(_conn, ea, (args) =>
171+
{
172+
if (args.Initiator != ShutdownInitiator.Application)
173+
{
174+
receivedCount = -1;
175+
allMessagesSeenTcs.TrySetException(args.Exception);
176+
}
177+
});
178+
};
179+
168180
using (IChannel publishChannel = await publishConnection.CreateChannelAsync())
169181
{
170182
await publishChannel.ConfirmSelectAsync();
@@ -173,10 +185,10 @@ public async Task TestMultithreadFloodPublishing()
173185
{
174186
HandleChannelShutdown(publishChannel, ea, (args) =>
175187
{
176-
if (args.Initiator == ShutdownInitiator.Peer)
188+
if (args.Initiator != ShutdownInitiator.Application)
177189
{
178190
stop = true;
179-
tcs.TrySetResult(false);
191+
allMessagesSeenTcs.TrySetException(args.Exception);
180192
}
181193
});
182194
};
@@ -197,22 +209,34 @@ public async Task TestMultithreadFloodPublishing()
197209
var cts = new CancellationTokenSource(WaitSpan);
198210
CancellationTokenRegistration ctsr = cts.Token.Register(() =>
199211
{
200-
tcs.TrySetResult(false);
212+
allMessagesSeenTcs.TrySetCanceled();
201213
});
202214

203215
try
204216
{
205217
using (IConnection consumeConnection = await _connFactory.CreateConnectionAsync())
206218
{
219+
consumeConnection.ConnectionShutdown += (o, ea) =>
220+
{
221+
HandleConnectionShutdown(_conn, ea, (args) =>
222+
{
223+
if (args.Initiator != ShutdownInitiator.Application)
224+
{
225+
receivedCount = -1;
226+
allMessagesSeenTcs.TrySetException(args.Exception);
227+
}
228+
});
229+
};
230+
207231
using (IChannel consumeChannel = await consumeConnection.CreateChannelAsync())
208232
{
209233
consumeChannel.ChannelShutdown += (o, ea) =>
210234
{
211235
HandleChannelShutdown(consumeChannel, ea, (args) =>
212236
{
213-
if (args.Initiator == ShutdownInitiator.Peer)
237+
if (args.Initiator != ShutdownInitiator.Application)
214238
{
215-
tcs.TrySetResult(false);
239+
allMessagesSeenTcs.TrySetException(args.Exception);
216240
}
217241
});
218242
};
@@ -224,7 +248,7 @@ public async Task TestMultithreadFloodPublishing()
224248
Assert.Equal(message, receivedMessage);
225249
if (Interlocked.Increment(ref receivedCount) == publishCount)
226250
{
227-
tcs.SetResult(true);
251+
allMessagesSeenTcs.SetResult(true);
228252
}
229253
await Task.Yield();
230254
};
@@ -233,7 +257,7 @@ await consumeChannel.BasicConsumeAsync(queue: queueName, autoAck: true,
233257
consumerTag: string.Empty, noLocal: false, exclusive: false,
234258
arguments: null, consumer: consumer);
235259

236-
Assert.True(await tcs.Task);
260+
Assert.True(await allMessagesSeenTcs.Task);
237261
await consumeChannel.CloseAsync();
238262
}
239263

0 commit comments

Comments
 (0)