Skip to content

Commit ccf0764

Browse files
committed
Make event handlers for basic.ack, basic.nack and basic.return async.
1 parent 5326343 commit ccf0764

File tree

10 files changed

+117
-50
lines changed

10 files changed

+117
-50
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,10 +426,7 @@ RabbitMQ.Client.IBasicProperties.UserId.get -> string
426426
RabbitMQ.Client.IBasicProperties.UserId.set -> void
427427
RabbitMQ.Client.IChannel
428428
RabbitMQ.Client.IChannel.BasicAckAsync(ulong deliveryTag, bool multiple, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
429-
RabbitMQ.Client.IChannel.BasicAcks -> System.EventHandler<RabbitMQ.Client.Events.BasicAckEventArgs>
430429
RabbitMQ.Client.IChannel.BasicNackAsync(ulong deliveryTag, bool multiple, bool requeue, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
431-
RabbitMQ.Client.IChannel.BasicNacks -> System.EventHandler<RabbitMQ.Client.Events.BasicNackEventArgs>
432-
RabbitMQ.Client.IChannel.BasicReturn -> System.EventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs>
433430
RabbitMQ.Client.IChannel.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
434431
RabbitMQ.Client.IChannel.ChannelNumber.get -> int
435432
RabbitMQ.Client.IChannel.ChannelShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
@@ -895,3 +892,7 @@ RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, Syst
895892
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
896893
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
897894
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
895+
RabbitMQ.Client.IChannel.BasicAcksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicAckEventArgs!>!
896+
RabbitMQ.Client.IChannel.BasicNacksAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicNackEventArgs!>!
897+
RabbitMQ.Client.IChannel.BasicReturnAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.BasicReturnEventArgs!>!
898+
RabbitMQ.Client.IChannel.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs!>!

projects/RabbitMQ.Client/client/api/IChannel.cs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,17 +106,17 @@ public interface IChannel : IDisposable
106106
/// <summary>
107107
/// Signalled when a Basic.Ack command arrives from the broker.
108108
/// </summary>
109-
event EventHandler<BasicAckEventArgs> BasicAcks;
109+
event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync;
110110

111111
/// <summary>
112112
/// Signalled when a Basic.Nack command arrives from the broker.
113113
/// </summary>
114-
event EventHandler<BasicNackEventArgs> BasicNacks;
114+
event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync;
115115

116116
/// <summary>
117117
/// Signalled when a Basic.Return command arrives from the broker.
118118
/// </summary>
119-
event EventHandler<BasicReturnEventArgs> BasicReturn;
119+
event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync;
120120

121121
/// <summary>
122122
/// Signalled when an exception occurs in a callback invoked by the channel.
@@ -127,6 +127,15 @@ public interface IChannel : IDisposable
127127
/// </summary>
128128
event EventHandler<CallbackExceptionEventArgs> CallbackException;
129129

130+
/// <summary>
131+
/// Signalled when an exception occurs in an asynchronous callback invoked by the channel.
132+
/// Examples of cases where this event will be signalled
133+
/// include exceptions thrown in <see cref="IChannel.BasicAcksAsync"/>,
134+
/// <see cref="IChannel.BasicNacksAsync"/>,
135+
/// <see cref="IChannel.BasicReturnAsync"/> event handlers, etc.
136+
/// </summary>
137+
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync;
138+
130139
event EventHandler<FlowControlEventArgs> FlowControl;
131140

132141
/// <summary>
@@ -184,7 +193,7 @@ Task<string> BasicConsumeAsync(string queue, bool autoAck, string consumerTag, b
184193
/// <summary>
185194
/// Asynchronously retrieve an individual message, if
186195
/// one is available; returns null if the server answers that
187-
/// no messages are currently available. See also <see cref="IChannel.BasicAckAsync" />.
196+
/// no messages are currently available. See also <see cref="IChannel.BasicAckAsync(ulong, bool, CancellationToken)"/>
188197
/// </summary>
189198
/// <param name="queue">The queue.</param>
190199
/// <param name="autoAck">If set to <c>true</c>, automatically ack the message.</param>
@@ -268,7 +277,9 @@ Task CloseAsync(ShutdownEventArgs reason, bool abort,
268277
/// <summary>
269278
/// Asynchronously enable publisher confirmations.
270279
/// </summary>
271-
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via <see cref="BasicAcks"/> and <see cref="BasicNacks"/> yourself.</param>
280+
/// <param name="trackConfirmations">Set to <c>false</c> if tracking via
281+
/// <see cref="IChannel.BasicAckAsync(ulong, bool, CancellationToken)"/> and
282+
/// <see cref="IChannel.BasicNackAsync(ulong, bool, bool, CancellationToken)"/> yourself.</param>
272283
/// <param name="cancellationToken">CancellationToken for this operation.</param>
273284
Task ConfirmSelectAsync(bool trackConfirmations = true,
274285
CancellationToken cancellationToken = default);

projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,22 +79,22 @@ public AutorecoveringChannel(AutorecoveringConnection conn, RecoveryAwareChannel
7979
_consumerDispatchConcurrency = consumerDispatchConcurrency;
8080
}
8181

82-
public event EventHandler<BasicAckEventArgs> BasicAcks
82+
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
8383
{
84-
add => InnerChannel.BasicAcks += value;
85-
remove => InnerChannel.BasicAcks -= value;
84+
add => InnerChannel.BasicAcksAsync += value;
85+
remove => InnerChannel.BasicAcksAsync -= value;
8686
}
8787

88-
public event EventHandler<BasicNackEventArgs> BasicNacks
88+
public event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync
8989
{
90-
add => InnerChannel.BasicNacks += value;
91-
remove => InnerChannel.BasicNacks -= value;
90+
add => InnerChannel.BasicNacksAsync += value;
91+
remove => InnerChannel.BasicNacksAsync -= value;
9292
}
9393

94-
public event EventHandler<BasicReturnEventArgs> BasicReturn
94+
public event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync
9595
{
96-
add => InnerChannel.BasicReturn += value;
97-
remove => InnerChannel.BasicReturn -= value;
96+
add => InnerChannel.BasicReturnAsync += value;
97+
remove => InnerChannel.BasicReturnAsync -= value;
9898
}
9999

100100
public event EventHandler<CallbackExceptionEventArgs> CallbackException
@@ -103,6 +103,12 @@ public event EventHandler<CallbackExceptionEventArgs> CallbackException
103103
remove => InnerChannel.CallbackException -= value;
104104
}
105105

106+
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync
107+
{
108+
add => InnerChannel.CallbackExceptionAsync += value;
109+
remove => InnerChannel.CallbackExceptionAsync -= value;
110+
}
111+
106112
public event EventHandler<FlowControlEventArgs> FlowControl
107113
{
108114
add { InnerChannel.FlowControl += value; }

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,16 @@ protected ChannelBase(ConnectionConfig config, ISession session,
7979
ContinuationTimeout = config.ContinuationTimeout;
8080
ConsumerDispatcher = new AsyncConsumerDispatcher(this,
8181
perChannelConsumerDispatchConcurrency.GetValueOrDefault(config.ConsumerDispatchConcurrency));
82-
Action<Exception, string> onException = (exception, context) =>
82+
83+
void onException(Exception exception, string context) =>
8384
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
84-
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
85-
_basicNacksWrapper = new EventingWrapper<BasicNackEventArgs>("OnBasicNack", onException);
86-
_basicReturnWrapper = new EventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onException);
85+
86+
Task onExceptionAsync(Exception exception, string context) =>
87+
OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
88+
89+
_basicAcksWrapper = new AsyncEventingWrapper<BasicAckEventArgs>("OnBasicAck", onExceptionAsync);
90+
_basicNacksWrapper = new AsyncEventingWrapper<BasicNackEventArgs>("OnBasicNack", onExceptionAsync);
91+
_basicReturnWrapper = new AsyncEventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onExceptionAsync);
8792
_callbackExceptionWrapper =
8893
new EventingWrapper<CallbackExceptionEventArgs>(string.Empty, (exception, context) => { });
8994
_flowControlWrapper = new EventingWrapper<FlowControlEventArgs>("OnFlowControl", onException);
@@ -97,44 +102,46 @@ protected ChannelBase(ConnectionConfig config, ISession session,
97102
internal TimeSpan HandshakeContinuationTimeout { get; set; } = TimeSpan.FromSeconds(10);
98103
public TimeSpan ContinuationTimeout { get; set; }
99104

100-
public event EventHandler<BasicAckEventArgs> BasicAcks
105+
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
101106
{
102107
add => _basicAcksWrapper.AddHandler(value);
103108
remove => _basicAcksWrapper.RemoveHandler(value);
104109
}
110+
private AsyncEventingWrapper<BasicAckEventArgs> _basicAcksWrapper;
105111

106-
private EventingWrapper<BasicAckEventArgs> _basicAcksWrapper;
107-
108-
public event EventHandler<BasicNackEventArgs> BasicNacks
112+
public event AsyncEventHandler<BasicNackEventArgs> BasicNacksAsync
109113
{
110114
add => _basicNacksWrapper.AddHandler(value);
111115
remove => _basicNacksWrapper.RemoveHandler(value);
112116
}
117+
private AsyncEventingWrapper<BasicNackEventArgs> _basicNacksWrapper;
113118

114-
private EventingWrapper<BasicNackEventArgs> _basicNacksWrapper;
115-
116-
public event EventHandler<BasicReturnEventArgs> BasicReturn
119+
public event AsyncEventHandler<BasicReturnEventArgs> BasicReturnAsync
117120
{
118121
add => _basicReturnWrapper.AddHandler(value);
119122
remove => _basicReturnWrapper.RemoveHandler(value);
120123
}
121-
122-
private EventingWrapper<BasicReturnEventArgs> _basicReturnWrapper;
124+
private AsyncEventingWrapper<BasicReturnEventArgs> _basicReturnWrapper;
123125

124126
public event EventHandler<CallbackExceptionEventArgs> CallbackException
125127
{
126128
add => _callbackExceptionWrapper.AddHandler(value);
127129
remove => _callbackExceptionWrapper.RemoveHandler(value);
128130
}
129-
130131
private EventingWrapper<CallbackExceptionEventArgs> _callbackExceptionWrapper;
131132

133+
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync
134+
{
135+
add => _callbackExceptionAsyncWrapper.AddHandler(value);
136+
remove => _callbackExceptionAsyncWrapper.RemoveHandler(value);
137+
}
138+
private AsyncEventingWrapper<CallbackExceptionEventArgs> _callbackExceptionAsyncWrapper;
139+
132140
public event EventHandler<FlowControlEventArgs> FlowControl
133141
{
134142
add => _flowControlWrapper.AddHandler(value);
135143
remove => _flowControlWrapper.RemoveHandler(value);
136144
}
137-
138145
private EventingWrapper<FlowControlEventArgs> _flowControlWrapper;
139146

140147
public event EventHandler<ShutdownEventArgs> ChannelShutdown
@@ -152,15 +159,13 @@ public event EventHandler<ShutdownEventArgs> ChannelShutdown
152159
}
153160
remove => _channelShutdownWrapper.RemoveHandler(value);
154161
}
155-
156162
private EventingWrapper<ShutdownEventArgs> _channelShutdownWrapper;
157163

158164
public event EventHandler<EventArgs> Recovery
159165
{
160166
add => _recoveryWrapper.AddHandler(value);
161167
remove => _recoveryWrapper.RemoveHandler(value);
162168
}
163-
164169
private EventingWrapper<EventArgs> _recoveryWrapper;
165170

166171
internal void RunRecoveryEventHandlers(object sender)
@@ -467,6 +472,11 @@ internal void OnCallbackException(CallbackExceptionEventArgs args)
467472
_callbackExceptionWrapper.Invoke(this, args);
468473
}
469474

475+
internal Task OnCallbackExceptionAsync(CallbackExceptionEventArgs args)
476+
{
477+
return _callbackExceptionAsyncWrapper.InvokeAsync(this, args);
478+
}
479+
470480
///<summary>Broadcasts notification of the final shutdown of the channel.</summary>
471481
///<remarks>
472482
///<para>
@@ -572,7 +582,8 @@ protected async Task<bool> HandleBasicAck(IncomingCommand cmd, CancellationToken
572582
if (!_basicAcksWrapper.IsEmpty)
573583
{
574584
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple);
575-
_basicAcksWrapper.Invoke(this, args);
585+
await _basicAcksWrapper.InvokeAsync(this, args)
586+
.ConfigureAwait(false);
576587
}
577588

578589
await HandleAckNack(ack._deliveryTag, ack._multiple, false, cancellationToken)
@@ -587,7 +598,8 @@ protected async Task<bool> HandleBasicNack(IncomingCommand cmd, CancellationToke
587598
{
588599
var args = new BasicNackEventArgs(
589600
nack._deliveryTag, nack._multiple, nack._requeue);
590-
_basicNacksWrapper.Invoke(this, args);
601+
await _basicNacksWrapper.InvokeAsync(this, args)
602+
.ConfigureAwait(false);
591603
}
592604

593605
await HandleAckNack(nack._deliveryTag, nack._multiple, true, cancellationToken)
@@ -628,15 +640,19 @@ protected virtual ulong AdjustDeliveryTag(ulong deliveryTag)
628640
return deliveryTag;
629641
}
630642

631-
protected void HandleBasicReturn(IncomingCommand cmd)
643+
protected Task HandleBasicReturn(IncomingCommand cmd)
632644
{
633645
if (!_basicReturnWrapper.IsEmpty)
634646
{
635647
var basicReturn = new BasicReturn(cmd.MethodSpan);
636648
var e = new BasicReturnEventArgs(basicReturn._replyCode, basicReturn._replyText,
637649
basicReturn._exchange, basicReturn._routingKey,
638650
new ReadOnlyBasicProperties(cmd.HeaderSpan), cmd.Body.Memory);
639-
_basicReturnWrapper.Invoke(this, e);
651+
return _basicReturnWrapper.InvokeAsync(this, e);
652+
}
653+
else
654+
{
655+
return Task.CompletedTask;
640656
}
641657
}
642658

projects/Test/Applications/PublisherConfirms/PublisherConfirms.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,15 +112,15 @@ async Task HandlePublishConfirmsAsynchronously()
112112
var allMessagesConfirmedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
113113
var outstandingConfirms = new LinkedList<ulong>();
114114
var semaphore = new SemaphoreSlim(1, 1);
115-
void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
115+
async Task CleanOutstandingConfirmsAsync(ulong deliveryTag, bool multiple)
116116
{
117117
if (debug)
118118
{
119119
Console.WriteLine("{0} [DEBUG] confirming message: {1} (multiple: {2})",
120120
DateTime.Now, deliveryTag, multiple);
121121
}
122122

123-
semaphore.Wait();
123+
await semaphore.WaitAsync();
124124
try
125125
{
126126
if (multiple)
@@ -158,11 +158,13 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
158158
}
159159
}
160160

161-
channel.BasicAcks += (sender, ea) => CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
162-
channel.BasicNacks += (sender, ea) =>
161+
channel.BasicAcksAsync += (sender, ea) =>
162+
CleanOutstandingConfirmsAsync(ea.DeliveryTag, ea.Multiple);
163+
164+
channel.BasicNacksAsync += (sender, ea) =>
163165
{
164166
Console.WriteLine($"{DateTime.Now} [WARNING] message sequence number: {ea.DeliveryTag} has been nacked (multiple: {ea.Multiple})");
165-
CleanOutstandingConfirms(ea.DeliveryTag, ea.Multiple);
167+
return CleanOutstandingConfirmsAsync(ea.DeliveryTag, ea.Multiple);
166168
};
167169

168170
var sw = new Stopwatch();

projects/Test/Common/IntegrationFixture.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,22 @@ protected void AddCallbackExceptionHandlers(IConnection conn, IChannel channel)
269269
{
270270
}
271271
};
272+
273+
channel.CallbackExceptionAsync += (o, ea) =>
274+
{
275+
_channelCallbackException = ea.Exception;
276+
277+
try
278+
{
279+
_output.WriteLine("{0} channel callback exception: {1}",
280+
_testDisplayName, _channelCallbackException);
281+
}
282+
catch (InvalidOperationException)
283+
{
284+
}
285+
286+
return Task.CompletedTask;
287+
};
272288
}
273289
}
274290

projects/Test/Integration/ConnectionRecovery/TestBasicAckAndBasicNack.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,8 +156,18 @@ public async Task TestBasicAckEventHandlerRecovery()
156156
{
157157
await _channel.ConfirmSelectAsync();
158158
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
159-
((AutorecoveringChannel)_channel).BasicAcks += (m, args) => tcs.SetResult(true);
160-
((AutorecoveringChannel)_channel).BasicNacks += (m, args) => tcs.SetResult(true);
159+
160+
((AutorecoveringChannel)_channel).BasicAcksAsync += (m, args) =>
161+
{
162+
tcs.SetResult(true);
163+
return Task.CompletedTask;
164+
};
165+
166+
((AutorecoveringChannel)_channel).BasicNacksAsync += (m, args) =>
167+
{
168+
tcs.SetResult(true);
169+
return Task.CompletedTask;
170+
};
161171

162172
await CloseAndWaitForRecoveryAsync();
163173
await CloseAndWaitForRecoveryAsync();

projects/Test/Integration/TestConcurrentAccessWithSharedChannel.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,16 @@ public async Task ConcurrentPublishSingleChannel()
5656
{
5757
int publishAckCount = 0;
5858

59-
_channel.BasicAcks += (object sender, BasicAckEventArgs e) =>
59+
_channel.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
6060
{
6161
Interlocked.Increment(ref publishAckCount);
62+
return Task.CompletedTask;
6263
};
6364

64-
_channel.BasicNacks += (object sender, BasicNackEventArgs e) =>
65+
_channel.BasicNacksAsync += (object sender, BasicNackEventArgs e) =>
6566
{
6667
_output.WriteLine($"channel #{_channel.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
68+
return Task.CompletedTask;
6769
};
6870

6971
await _channel.ConfirmSelectAsync(trackConfirmations: false);

projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,20 @@ private Task TestConcurrentChannelOpenAndPublishingWithBodyAsync(byte[] body, in
125125

126126
await ch.ConfirmSelectAsync(trackConfirmations: false);
127127

128-
ch.BasicAcks += (object sender, BasicAckEventArgs e) =>
128+
ch.BasicAcksAsync += (object sender, BasicAckEventArgs e) =>
129129
{
130130
if (e.DeliveryTag >= _messageCount)
131131
{
132132
tcs.SetResult(true);
133133
}
134+
return Task.CompletedTask;
134135
};
135136

136-
ch.BasicNacks += (object sender, BasicNackEventArgs e) =>
137+
ch.BasicNacksAsync += (object sender, BasicNackEventArgs e) =>
137138
{
138139
tcs.SetResult(false);
139140
_output.WriteLine($"channel #{ch.ChannelNumber} saw a nack, deliveryTag: {e.DeliveryTag}, multiple: {e.Multiple}");
141+
return Task.CompletedTask;
140142
};
141143

142144
QueueDeclareOk q = await ch.QueueDeclareAsync(queue: string.Empty, passive: false, durable: false, exclusive: true, autoDelete: true, arguments: null);

0 commit comments

Comments
 (0)