Skip to content

Commit 3bb4665

Browse files
committed
* Ensure exceptions make into inner exception for HardProtocolException
1 parent cbdbcce commit 3bb4665

File tree

11 files changed

+63
-57
lines changed

11 files changed

+63
-57
lines changed

projects/RabbitMQ.Client/Events/ShutdownEventArgs.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string r
7272
/// <summary>
7373
/// Construct a <see cref="ShutdownEventArgs"/> with the given parameters.
7474
/// </summary>
75-
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText, Exception exception, CancellationToken cancellationToken = default)
75+
public ShutdownEventArgs(ShutdownInitiator initiator, ushort replyCode, string replyText,
76+
Exception exception, CancellationToken cancellationToken = default)
7677
: this(initiator, replyCode, replyText, 0, 0, cancellationToken: cancellationToken)
7778
{
7879
_exception = exception ?? throw new ArgumentNullException(nameof(exception));

projects/RabbitMQ.Client/Exceptions/OperationInterruptedException.cs

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,35 +45,30 @@ namespace RabbitMQ.Client.Exceptions
4545
public class OperationInterruptedException
4646
: RabbitMQClientException
4747
{
48-
///<summary>Construct an OperationInterruptedException with
49-
///the passed-in explanation, if any.</summary>
50-
public OperationInterruptedException(ShutdownEventArgs? reason)
51-
: base(reason is null ? "The AMQP operation was interrupted" :
52-
$"The AMQP operation was interrupted: {reason}")
48+
///<summary>
49+
///Construct an OperationInterruptedException
50+
///</summary>
51+
public OperationInterruptedException() : base("The AMQP operation was interrupted")
5352
{
54-
ShutdownReason = reason;
55-
}
5653

57-
///<summary>Construct an OperationInterruptedException with
58-
///the passed-in explanation and prefix, if any.</summary>
59-
public OperationInterruptedException(ShutdownEventArgs? reason, string prefix)
60-
: base(reason is null ? $"{prefix}: The AMQP operation was interrupted" :
61-
$"{prefix}: The AMQP operation was interrupted: {reason}")
62-
{
63-
ShutdownReason = reason;
6454
}
65-
66-
protected OperationInterruptedException()
55+
///<summary>
56+
///Construct an OperationInterruptedException with
57+
///the passed-in explanation, if any.
58+
///</summary>
59+
public OperationInterruptedException(ShutdownEventArgs reason)
60+
: base($"The AMQP operation was interrupted: {reason}", reason.Exception)
6761
{
62+
ShutdownReason = reason;
6863
}
6964

70-
protected OperationInterruptedException(string message) : base(message)
71-
{
72-
}
7365

74-
protected OperationInterruptedException(string message, Exception inner)
75-
: base(message, inner)
66+
///<summary>Construct an OperationInterruptedException with
67+
///the passed-in explanation and prefix, if any.</summary>
68+
public OperationInterruptedException(ShutdownEventArgs reason, string prefix)
69+
: base($"{prefix}: The AMQP operation was interrupted: {reason}", reason.Exception)
7670
{
71+
ShutdownReason = reason;
7772
}
7873

7974
///<summary>Retrieves the explanation for the shutdown. May

projects/RabbitMQ.Client/Exceptions/ProtocolViolationException.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ namespace RabbitMQ.Client.Exceptions
3636
[Serializable]
3737
public class ProtocolViolationException : RabbitMQClientException
3838
{
39-
public ProtocolViolationException(string message) : base(message)
39+
public ProtocolViolationException() : base()
4040
{
4141
}
42-
public ProtocolViolationException(string message, Exception inner) : base(message, inner)
42+
43+
public ProtocolViolationException(string message) : base(message)
4344
{
4445
}
45-
public ProtocolViolationException()
46+
47+
public ProtocolViolationException(string message, Exception inner) : base(message, inner)
4648
{
4749
}
4850
}

projects/RabbitMQ.Client/Exceptions/RabbitMQClientException.cs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,21 @@ namespace RabbitMQ.Client.Exceptions
3737
public abstract class RabbitMQClientException : Exception
3838
{
3939
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class.</summary>
40-
protected RabbitMQClientException()
40+
protected RabbitMQClientException() : base()
4141
{
42-
4342
}
4443

4544
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message.</summary>
4645
/// <param name="message">The message that describes the error. </param>
4746
protected RabbitMQClientException(string message) : base(message)
4847
{
49-
5048
}
5149

5250
/// <summary>Initializes a new instance of the <see cref="RabbitMQClientException" /> class with a specified error message and a reference to the inner exception that is the cause of this exception.</summary>
5351
/// <param name="message">The error message that explains the reason for the exception. </param>
5452
/// <param name="innerException">The exception that is the cause of the current exception, or a null reference (Nothing in Visual Basic) if no inner exception is specified. </param>
55-
protected RabbitMQClientException(string message, Exception innerException) : base(message, innerException)
53+
protected RabbitMQClientException(string message, Exception? innerException) : base(message, innerException)
5654
{
57-
5855
}
5956
}
6057
}

projects/RabbitMQ.Client/Exceptions/UnexpectedMethodException.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
namespace RabbitMQ.Client.Exceptions
3636
{
3737
/// <summary>
38+
/// TODO WHY IS THIS UNREFERENCED?
3839
/// Thrown when the channel receives an RPC reply that it wasn't expecting.
3940
/// </summary>
4041
[Serializable]

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,13 @@ public TimeSpan ContinuationTimeout
7272
}
7373

7474
public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel innerChannel,
75-
ushort consumerDispatchConcurrency)
75+
ushort consumerDispatchConcurrency, bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
7676
{
7777
_connection = conn;
7878
_innerChannel = innerChannel;
7979
_consumerDispatchConcurrency = consumerDispatchConcurrency;
80+
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
81+
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
8082
}
8183

8284
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -348,12 +350,16 @@ public Task BasicQosAsync(uint prefetchSize, ushort prefetchCount, bool global,
348350
return _innerChannel.BasicQosAsync(prefetchSize, prefetchCount, global, cancellationToken);
349351
}
350352

351-
public async Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnabled = false, CancellationToken cancellationToken = default)
353+
public Task ConfirmSelectAsync(bool publisherConfirmationTrackingEnabled = false, CancellationToken cancellationToken = default)
352354
{
355+
/*
356+
* Note:
357+
* No need to pass this on to InnerChannel, as confirms will have already
358+
* been enabled
359+
*/
353360
_publisherConfirmationsEnabled = true;
354361
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
355-
await InnerChannel.ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
356-
.ConfigureAwait(false);
362+
return Task.CompletedTask;
357363
}
358364

359365
public async Task ExchangeBindAsync(string destination, string source, string routingKey,

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -269,13 +269,8 @@ public async Task<IChannel> CreateChannelAsync(bool publisherConfirmationsEnable
269269
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled, cdc, cancellationToken)
270270
.ConfigureAwait(false);
271271

272-
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc);
273-
if (publisherConfirmationsEnabled)
274-
{
275-
// TODO yes, this is necessary, not sure why
276-
await autorecoveringChannel.ConfirmSelectAsync(publisherConfirmationTrackingEnabled, cancellationToken)
277-
.ConfigureAwait(false);
278-
}
272+
var autorecoveringChannel = new AutorecoveringChannel(this, recoveryAwareChannel, cdc,
273+
publisherConfirmationsEnabled, publisherConfirmationTrackingEnabled);
279274
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
280275
.ConfigureAwait(false);
281276
return autorecoveringChannel;

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,6 @@ RabbitMQ.Client.Exceptions.MalformedFrameException.MalformedFrameException(strin
278278
RabbitMQ.Client.Exceptions.MalformedFrameException.MalformedFrameException(string message, bool canShutdownCleanly) -> void
279279
RabbitMQ.Client.Exceptions.OperationInterruptedException
280280
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException() -> void
281-
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(string message) -> void
282-
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(string message, System.Exception inner) -> void
283281
RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.set -> void
284282
RabbitMQ.Client.Exceptions.PacketNotRecognizedException
285283
RabbitMQ.Client.Exceptions.PacketNotRecognizedException.PacketNotRecognizedException(int transportHigh, int transportLow, int serverMajor, int serverMinor) -> void
@@ -299,11 +297,9 @@ RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ProtocolVersionMisma
299297
RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ServerMajor.get -> int
300298
RabbitMQ.Client.Exceptions.ProtocolVersionMismatchException.ServerMinor.get -> int
301299
RabbitMQ.Client.Exceptions.ProtocolViolationException
302-
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
303300
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException(string message) -> void
304301
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException(string message, System.Exception inner) -> void
305302
RabbitMQ.Client.Exceptions.RabbitMQClientException
306-
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
307303
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException(string message) -> void
308304
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException(string message, System.Exception innerException) -> void
309305
RabbitMQ.Client.Exceptions.SyntaxErrorException
@@ -884,8 +880,6 @@ RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.Shutd
884880
RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, System.Exception! exception, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
885881
RabbitMQ.Client.Events.ShutdownEventArgs.ShutdownEventArgs(RabbitMQ.Client.ShutdownInitiator initiator, ushort replyCode, string! replyText, ushort classId, ushort methodId, object? cause = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
886882
RabbitMQ.Client.Exceptions.AlreadyClosedException.AlreadyClosedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
887-
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs? reason) -> void
888-
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs? reason, string! prefix) -> void
889883
RabbitMQ.Client.Exceptions.OperationInterruptedException.ShutdownReason.get -> RabbitMQ.Client.Events.ShutdownEventArgs?
890884
RabbitMQ.Client.IAsyncBasicConsumer.HandleChannelShutdownAsync(object! channel, RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> System.Threading.Tasks.Task!
891885
RabbitMQ.Client.IChannel.ChannelShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ShutdownEventArgs!>!
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1+
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason) -> void
2+
RabbitMQ.Client.Exceptions.OperationInterruptedException.OperationInterruptedException(RabbitMQ.Client.Events.ShutdownEventArgs! reason, string! prefix) -> void
3+
RabbitMQ.Client.Exceptions.ProtocolViolationException.ProtocolViolationException() -> void
4+
RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() -> void
15
RabbitMQ.Client.IConnection.CreateChannelAsync(bool publisherConfirmationsEnabled = false, bool publisherConfirmationTrackingEnabled = false, ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ static Task<IConnection> CreateConnectionAsync()
5454

5555
static async Task PublishMessagesIndividuallyAsync()
5656
{
57-
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages individually and handling confirms all at once");
57+
Console.WriteLine($"{DateTime.Now} [INFO] publishing {MESSAGE_COUNT:N0} messages and handling confirms per-message");
5858

5959
await using IConnection connection = await CreateConnectionAsync();
6060
await using IChannel channel = await connection.CreateChannelAsync(publisherConfirmationsEnabled: true,
@@ -90,32 +90,38 @@ static async Task PublishMessagesInBatchAsync()
9090
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
9191
string queueName = queueDeclareResult.QueueName;
9292

93-
int batchSize = 100;
93+
int batchSize = 500;
9494
int outstandingMessageCount = 0;
9595

9696
var sw = new Stopwatch();
9797
sw.Start();
9898

99-
var publishTasks = new List<Task>();
99+
var publishTasks = new List<ValueTask>();
100100
for (int i = 0; i < MESSAGE_COUNT; i++)
101101
{
102102
byte[] body = Encoding.UTF8.GetBytes(i.ToString());
103-
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body).AsTask());
103+
publishTasks.Add(channel.BasicPublishAsync(exchange: string.Empty, routingKey: queueName, body: body));
104104
outstandingMessageCount++;
105105

106106
if (outstandingMessageCount == batchSize)
107107
{
108-
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
109-
await Task.WhenAll(publishTasks).WaitAsync(cts.Token);
108+
foreach (ValueTask vt in publishTasks)
109+
{
110+
await vt;
111+
}
110112
publishTasks.Clear();
111113
outstandingMessageCount = 0;
112114
}
113115
}
114116

115-
if (outstandingMessageCount > 0)
117+
if (publishTasks.Count > 0)
116118
{
117-
// using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
118-
// await channel.WaitForConfirmsOrDieAsync(cts.Token);
119+
foreach (ValueTask vt in publishTasks)
120+
{
121+
await vt;
122+
}
123+
publishTasks.Clear();
124+
outstandingMessageCount = 0;
119125
}
120126

121127
sw.Stop();

projects/Test/Integration/TestBasicPublish.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using System.Threading.Tasks;
3636
using RabbitMQ.Client;
3737
using RabbitMQ.Client.Events;
38+
using RabbitMQ.Client.Exceptions;
3839
using Xunit;
3940
using Xunit.Abstractions;
4041
using Xunit.Sdk;
@@ -247,7 +248,9 @@ public async Task TestMaxInboundMessageBodySize()
247248
string tag = await channel.BasicConsumeAsync(q.QueueName, true, consumer);
248249

249250
await channel.BasicPublishAsync("", q.QueueName, msg0);
250-
await channel.BasicPublishAsync("", q.QueueName, msg1);
251+
AlreadyClosedException ex = await Assert.ThrowsAsync<AlreadyClosedException>(() =>
252+
channel.BasicPublishAsync("", q.QueueName, msg1).AsTask());
253+
Assert.IsType<MalformedFrameException>(ex.InnerException);
251254
Assert.True(await tcs.Task);
252255

253256
Assert.Equal(1, count);
@@ -259,6 +262,7 @@ public async Task TestMaxInboundMessageBodySize()
259262
try
260263
{
261264
await channel.CloseAsync();
265+
await channel.DisposeAsync();
262266
}
263267
catch (Exception chex)
264268
{
@@ -272,6 +276,7 @@ public async Task TestMaxInboundMessageBodySize()
272276
try
273277
{
274278
await conn.CloseAsync();
279+
await conn.DisposeAsync();
275280
}
276281
catch (Exception connex)
277282
{

0 commit comments

Comments
 (0)