Skip to content

Commit 6a45f13

Browse files
authored
Merge pull request #1677 from danielmarbach/async-connection-events
Make connection events async
2 parents b6b400d + 5b6956b commit 6a45f13

33 files changed

+402
-209
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -445,24 +445,15 @@ RabbitMQ.Client.IChannel.IsClosed.get -> bool
445445
RabbitMQ.Client.IChannel.IsOpen.get -> bool
446446
RabbitMQ.Client.IChannelExtensions
447447
RabbitMQ.Client.IConnection
448-
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
449448
RabbitMQ.Client.IConnection.ChannelMax.get -> ushort
450449
RabbitMQ.Client.IConnection.ClientProperties.get -> System.Collections.Generic.IDictionary<string, object>
451450
RabbitMQ.Client.IConnection.ClientProvidedName.get -> string
452451
RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
453-
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
454-
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
455-
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
456-
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
457-
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
458452
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
459453
RabbitMQ.Client.IConnection.FrameMax.get -> uint
460454
RabbitMQ.Client.IConnection.Heartbeat.get -> System.TimeSpan
461455
RabbitMQ.Client.IConnection.IsOpen.get -> bool
462456
RabbitMQ.Client.IConnection.Protocol.get -> RabbitMQ.Client.IProtocol
463-
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs>
464-
RabbitMQ.Client.IConnection.RecoveringConsumer -> System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs>
465-
RabbitMQ.Client.IConnection.RecoverySucceeded -> System.EventHandler<System.EventArgs>
466457
RabbitMQ.Client.IConnection.ServerProperties.get -> System.Collections.Generic.IDictionary<string, object>
467458
RabbitMQ.Client.IConnection.ShutdownReport.get -> System.Collections.Generic.IEnumerable<RabbitMQ.Client.ShutdownReportEntry>
468459
RabbitMQ.Client.IConnectionExtensions
@@ -895,3 +886,12 @@ RabbitMQ.Client.IChannel.ConfirmSelectAsync(bool trackConfirmations = true, Syst
895886
const RabbitMQ.Client.Constants.DefaultConsumerDispatchConcurrency = 1 -> ushort
896887
readonly RabbitMQ.Client.ConnectionConfig.ConsumerDispatchConcurrency -> ushort
897888
RabbitMQ.Client.IConnection.CreateChannelAsync(ushort? consumerDispatchConcurrency = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
889+
RabbitMQ.Client.IConnection.CallbackExceptionAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs!>!
890+
RabbitMQ.Client.IConnection.ConnectionBlockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs!>!
891+
RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs!>!
892+
RabbitMQ.Client.IConnection.RecoverySucceededAsync -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs!>!
893+
RabbitMQ.Client.IConnection.ConnectionRecoveryErrorAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs!>!
894+
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs!>!
895+
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecoveryAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs!>!
896+
RabbitMQ.Client.IConnection.RecoveringConsumerAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs!>!
897+
RabbitMQ.Client.IConnection.ConnectionUnblockedAsync -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs!>!

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,7 @@ public interface IConnection : INetworkConnection, IDisposable
143143
/// <see cref="IConnection"/>, then this event will be signalled whenever one
144144
/// of those event handlers throws an exception, as well.
145145
/// </remarks>
146-
event EventHandler<CallbackExceptionEventArgs> CallbackException;
147-
148-
event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked;
146+
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackExceptionAsync;
149147

150148
/// <summary>
151149
/// Raised when the connection is destroyed.
@@ -155,15 +153,15 @@ public interface IConnection : INetworkConnection, IDisposable
155153
/// event handler is added to this event, the event handler
156154
/// will be fired immediately.
157155
/// </remarks>
158-
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
156+
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync;
159157

160158
/// <summary>
161159
/// Raised when the connection completes recovery.
162160
/// </summary>
163161
/// <remarks>
164162
/// This event will never fire for connections that disable automatic recovery.
165163
/// </remarks>
166-
event EventHandler<EventArgs> RecoverySucceeded;
164+
event AsyncEventHandler<EventArgs> RecoverySucceededAsync;
167165

168166
/// <summary>
169167
/// Raised when the connection recovery fails, e.g. because reconnection or topology
@@ -172,7 +170,7 @@ public interface IConnection : INetworkConnection, IDisposable
172170
/// <remarks>
173171
/// This event will never fire for connections that disable automatic recovery.
174172
/// </remarks>
175-
event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
173+
event AsyncEventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryErrorAsync;
176174

177175
/// <summary>
178176
/// Raised when the server-generated tag of a consumer registered on this connection changes during
@@ -182,7 +180,7 @@ public interface IConnection : INetworkConnection, IDisposable
182180
/// <remarks>
183181
/// This event will never fire for connections that disable automatic recovery.
184182
/// </remarks>
185-
event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
183+
event AsyncEventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecoveryAsync;
186184

187185
/// <summary>
188186
/// Raised when the name of a server-named queue declared on this connection changes during
@@ -192,7 +190,7 @@ public interface IConnection : INetworkConnection, IDisposable
192190
/// <remarks>
193191
/// This event will never fire for connections that disable automatic recovery.
194192
/// </remarks>
195-
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery;
193+
event AsyncEventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecoveryAsync;
196194

197195
/// <summary>
198196
/// Raised when a consumer is about to be recovered. This event raises when topology recovery
@@ -204,9 +202,17 @@ public interface IConnection : INetworkConnection, IDisposable
204202
/// <remarks>
205203
/// This event will never fire for connections that disable automatic recovery.
206204
/// </remarks>
207-
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
205+
public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAsync;
206+
207+
/// <summary>
208+
/// Raised when a connection is blocked by the AMQP broker.
209+
/// </summary>
210+
event AsyncEventHandler<ConnectionBlockedEventArgs> ConnectionBlockedAsync;
208211

209-
event EventHandler<EventArgs> ConnectionUnblocked;
212+
/// <summary>
213+
/// Raised when a connection is unblocked by the AMQP broker.
214+
/// </summary>
215+
event AsyncEventHandler<EventArgs> ConnectionUnblockedAsync;
210216

211217
/// <summary>
212218
/// This method updates the secret used to authenticate this connection.

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
117117
}
118118
case ProtocolCommandId.ConnectionBlocked:
119119
{
120-
HandleConnectionBlocked(cmd);
121-
return Task.FromResult(true);
120+
// Note: always returns true
121+
return HandleConnectionBlockedAsync(cmd, cancellationToken);
122122
}
123123
case ProtocolCommandId.ConnectionClose:
124124
{
@@ -128,7 +128,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
128128
case ProtocolCommandId.ConnectionSecure:
129129
{
130130
// Note: always returns true
131-
return HandleConnectionSecureAsync(cmd);
131+
return HandleConnectionSecureAsync(cmd, cancellationToken);
132132
}
133133
case ProtocolCommandId.ConnectionStart:
134134
{
@@ -138,12 +138,12 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
138138
case ProtocolCommandId.ConnectionTune:
139139
{
140140
// Note: always returns true
141-
return HandleConnectionTuneAsync(cmd);
141+
return HandleConnectionTuneAsync(cmd, cancellationToken);
142142
}
143143
case ProtocolCommandId.ConnectionUnblocked:
144144
{
145-
HandleConnectionUnblocked();
146-
return Task.FromResult(true);
145+
// Note: always returns true
146+
return HandleConnectionUnblockedAsync(cancellationToken);
147147
}
148148
default:
149149
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
4646
private Task? _recoveryTask;
4747
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();
4848

49-
private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
49+
private Task HandleConnectionShutdownAsync(object? _, ShutdownEventArgs args)
5050
{
5151
if (ShouldTriggerConnectionRecovery(args))
5252
{
@@ -57,6 +57,8 @@ private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
5757
}
5858
}
5959

60+
return Task.CompletedTask;
61+
6062
static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
6163
{
6264
if (args.Initiator == ShutdownInitiator.Peer)
@@ -204,7 +206,8 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c
204206

205207
ESLog.Info("Connection recovery completed");
206208
ThrowIfDisposed();
207-
_recoverySucceededWrapper.Invoke(this, EventArgs.Empty);
209+
await _recoverySucceededAsyncWrapper.InvokeAsync(this, EventArgs.Empty)
210+
.ConfigureAwait(false);
208211

209212
return true;
210213
}
@@ -266,10 +269,11 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
266269
{
267270
ESLog.Error("Connection recovery exception.", e);
268271
// Trigger recovery error events
269-
if (!_connectionRecoveryErrorWrapper.IsEmpty)
272+
if (!_connectionRecoveryErrorAsyncWrapper.IsEmpty)
270273
{
271274
// Note: recordedEntities semaphore is _NOT_ held at this point
272-
_connectionRecoveryErrorWrapper.Invoke(this, new ConnectionRecoveryErrorEventArgs(e));
275+
await _connectionRecoveryErrorAsyncWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e))
276+
.ConfigureAwait(false);
273277
}
274278

275279
maybeNewInnerConnection?.Dispose();
@@ -377,12 +381,13 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
377381
recordedEntitiesSemaphoreHeld: recordedEntitiesSemaphoreHeld, cancellationToken)
378382
.ConfigureAwait(false);
379383

380-
if (!_queueNameChangedAfterRecoveryWrapper.IsEmpty)
384+
if (!_queueNameChangedAfterRecoveryAsyncWrapper.IsEmpty)
381385
{
382386
try
383387
{
384388
_recordedEntitiesSemaphore.Release();
385-
_queueNameChangedAfterRecoveryWrapper.Invoke(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName));
389+
await _queueNameChangedAfterRecoveryAsyncWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName))
390+
.ConfigureAwait(false);
386391
}
387392
finally
388393
{
@@ -515,7 +520,8 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe
515520
try
516521
{
517522
_recordedEntitiesSemaphore.Release();
518-
_consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments));
523+
await _recoveringConsumerAsyncWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments))
524+
.ConfigureAwait(false);
519525
}
520526
finally
521527
{
@@ -531,12 +537,13 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
531537
RecordedConsumer consumerWithNewConsumerTag = RecordedConsumer.WithNewConsumerTag(newTag, consumer);
532538
UpdateConsumer(oldTag, newTag, consumerWithNewConsumerTag);
533539

534-
if (!_consumerTagChangeAfterRecoveryWrapper.IsEmpty)
540+
if (!_consumerTagChangeAfterRecoveryAsyncWrapper.IsEmpty)
535541
{
536542
try
537543
{
538544
_recordedEntitiesSemaphore.Release();
539-
_consumerTagChangeAfterRecoveryWrapper.Invoke(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag));
545+
await _consumerTagChangeAfterRecoveryAsyncWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag))
546+
.ConfigureAwait(false);
540547
}
541548
finally
542549
{

0 commit comments

Comments
 (0)