Skip to content

Commit e0ea90f

Browse files
committed
Make connection events async
1 parent 1896a62 commit e0ea90f

31 files changed

+302
-136
lines changed

projects/RabbitMQ.Client/PublicAPI.Shipped.txt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -445,24 +445,24 @@ RabbitMQ.Client.IChannel.IsOpen.get -> bool
445445
RabbitMQ.Client.IChannel.NextPublishSeqNo.get -> ulong
446446
RabbitMQ.Client.IChannelExtensions
447447
RabbitMQ.Client.IConnection
448-
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
448+
RabbitMQ.Client.IConnection.CallbackException -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
449449
RabbitMQ.Client.IConnection.ChannelMax.get -> ushort
450450
RabbitMQ.Client.IConnection.ClientProperties.get -> System.Collections.Generic.IDictionary<string, object>
451451
RabbitMQ.Client.IConnection.ClientProvidedName.get -> string
452452
RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
453-
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
454-
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
455-
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
456-
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
457-
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
453+
RabbitMQ.Client.IConnection.ConnectionBlocked -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
454+
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
455+
RabbitMQ.Client.IConnection.ConnectionShutdown -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
456+
RabbitMQ.Client.IConnection.ConnectionUnblocked -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs>
457+
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
458458
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint
459459
RabbitMQ.Client.IConnection.FrameMax.get -> uint
460460
RabbitMQ.Client.IConnection.Heartbeat.get -> System.TimeSpan
461461
RabbitMQ.Client.IConnection.IsOpen.get -> bool
462462
RabbitMQ.Client.IConnection.Protocol.get -> RabbitMQ.Client.IProtocol
463-
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs>
464-
RabbitMQ.Client.IConnection.RecoveringConsumer -> System.EventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs>
465-
RabbitMQ.Client.IConnection.RecoverySucceeded -> System.EventHandler<System.EventArgs>
463+
RabbitMQ.Client.IConnection.QueueNameChangedAfterRecovery -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.QueueNameChangedAfterRecoveryEventArgs>
464+
RabbitMQ.Client.IConnection.RecoveringConsumer -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.Events.RecoveringConsumerEventArgs>
465+
RabbitMQ.Client.IConnection.RecoverySucceeded -> RabbitMQ.Client.Events.AsyncEventHandler<System.EventArgs>
466466
RabbitMQ.Client.IConnection.ServerProperties.get -> System.Collections.Generic.IDictionary<string, object>
467467
RabbitMQ.Client.IConnection.ShutdownReport.get -> System.Collections.Generic.IEnumerable<RabbitMQ.Client.ShutdownReportEntry>
468468
RabbitMQ.Client.IConnectionExtensions

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,9 @@ public interface IConnection : INetworkConnection, IDisposable
143143
/// <see cref="IConnection"/>, then this event will be signalled whenever one
144144
/// of those event handlers throws an exception, as well.
145145
/// </remarks>
146-
event EventHandler<CallbackExceptionEventArgs> CallbackException;
146+
event AsyncEventHandler<CallbackExceptionEventArgs> CallbackException;
147147

148-
event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked;
148+
event AsyncEventHandler<ConnectionBlockedEventArgs> ConnectionBlocked;
149149

150150
/// <summary>
151151
/// Raised when the connection is destroyed.
@@ -155,15 +155,15 @@ public interface IConnection : INetworkConnection, IDisposable
155155
/// event handler is added to this event, the event handler
156156
/// will be fired immediately.
157157
/// </remarks>
158-
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
158+
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdown;
159159

160160
/// <summary>
161161
/// Raised when the connection completes recovery.
162162
/// </summary>
163163
/// <remarks>
164164
/// This event will never fire for connections that disable automatic recovery.
165165
/// </remarks>
166-
event EventHandler<EventArgs> RecoverySucceeded;
166+
event AsyncEventHandler<EventArgs> RecoverySucceeded;
167167

168168
/// <summary>
169169
/// Raised when the connection recovery fails, e.g. because reconnection or topology
@@ -172,7 +172,7 @@ public interface IConnection : INetworkConnection, IDisposable
172172
/// <remarks>
173173
/// This event will never fire for connections that disable automatic recovery.
174174
/// </remarks>
175-
event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
175+
event AsyncEventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError;
176176

177177
/// <summary>
178178
/// Raised when the server-generated tag of a consumer registered on this connection changes during
@@ -182,7 +182,7 @@ public interface IConnection : INetworkConnection, IDisposable
182182
/// <remarks>
183183
/// This event will never fire for connections that disable automatic recovery.
184184
/// </remarks>
185-
event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
185+
event AsyncEventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery;
186186

187187
/// <summary>
188188
/// Raised when the name of a server-named queue declared on this connection changes during
@@ -192,7 +192,7 @@ public interface IConnection : INetworkConnection, IDisposable
192192
/// <remarks>
193193
/// This event will never fire for connections that disable automatic recovery.
194194
/// </remarks>
195-
event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery;
195+
event AsyncEventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery;
196196

197197
/// <summary>
198198
/// Raised when a consumer is about to be recovered. This event raises when topology recovery
@@ -204,9 +204,9 @@ public interface IConnection : INetworkConnection, IDisposable
204204
/// <remarks>
205205
/// This event will never fire for connections that disable automatic recovery.
206206
/// </remarks>
207-
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
207+
public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumer;
208208

209-
event EventHandler<EventArgs> ConnectionUnblocked;
209+
event AsyncEventHandler<EventArgs> ConnectionUnblocked;
210210

211211
/// <summary>
212212
/// This method updates the secret used to authenticate this connection.

projects/RabbitMQ.Client/client/framing/Channel.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
119119
}
120120
case ProtocolCommandId.ConnectionBlocked:
121121
{
122-
HandleConnectionBlocked(cmd);
123-
return Task.FromResult(true);
122+
// Note: always returns true
123+
return HandleConnectionBlocked(cmd, cancellationToken);
124124
}
125125
case ProtocolCommandId.ConnectionClose:
126126
{
@@ -130,7 +130,7 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
130130
case ProtocolCommandId.ConnectionSecure:
131131
{
132132
// Note: always returns true
133-
return HandleConnectionSecureAsync(cmd);
133+
return HandleConnectionSecureAsync(cmd, cancellationToken);
134134
}
135135
case ProtocolCommandId.ConnectionStart:
136136
{
@@ -140,12 +140,12 @@ protected override Task<bool> DispatchCommandAsync(IncomingCommand cmd, Cancella
140140
case ProtocolCommandId.ConnectionTune:
141141
{
142142
// Note: always returns true
143-
return HandleConnectionTuneAsync(cmd);
143+
return HandleConnectionTuneAsync(cmd, cancellationToken);
144144
}
145145
case ProtocolCommandId.ConnectionUnblocked:
146146
{
147-
HandleConnectionUnblocked();
148-
return Task.FromResult(true);
147+
// Note: always returns true
148+
return HandleConnectionUnblocked(cancellationToken);
149149
}
150150
default:
151151
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
4646
private Task? _recoveryTask;
4747
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();
4848

49-
private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
49+
private Task HandleConnectionShutdown(object? _, ShutdownEventArgs args)
5050
{
5151
if (ShouldTriggerConnectionRecovery(args))
5252
{
@@ -57,6 +57,8 @@ private void HandleConnectionShutdown(object? _, ShutdownEventArgs args)
5757
}
5858
}
5959

60+
return Task.CompletedTask;
61+
6062
static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
6163
{
6264
if (args.Initiator == ShutdownInitiator.Peer)
@@ -204,7 +206,8 @@ await RecoverChannelsAndItsConsumersAsync(recordedEntitiesSemaphoreHeld: true, c
204206

205207
ESLog.Info("Connection recovery completed");
206208
ThrowIfDisposed();
207-
_recoverySucceededWrapper.Invoke(this, EventArgs.Empty);
209+
await _recoverySucceededWrapper.InvokeAsync(this, EventArgs.Empty)
210+
.ConfigureAwait(false);
208211

209212
return true;
210213
}
@@ -269,7 +272,8 @@ await maybeNewInnerConnection.OpenAsync(cancellationToken)
269272
if (!_connectionRecoveryErrorWrapper.IsEmpty)
270273
{
271274
// Note: recordedEntities semaphore is _NOT_ held at this point
272-
_connectionRecoveryErrorWrapper.Invoke(this, new ConnectionRecoveryErrorEventArgs(e));
275+
await _connectionRecoveryErrorWrapper.InvokeAsync(this, new ConnectionRecoveryErrorEventArgs(e))
276+
.ConfigureAwait(false);
273277
}
274278

275279
maybeNewInnerConnection?.Dispose();
@@ -382,7 +386,8 @@ await RecordQueueAsync(new RecordedQueue(newName, recordedQueue),
382386
try
383387
{
384388
_recordedEntitiesSemaphore.Release();
385-
_queueNameChangedAfterRecoveryWrapper.Invoke(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName));
389+
await _queueNameChangedAfterRecoveryWrapper.InvokeAsync(this, new QueueNameChangedAfterRecoveryEventArgs(oldName, newName))
390+
.ConfigureAwait(false);
386391
}
387392
finally
388393
{
@@ -515,7 +520,8 @@ internal async ValueTask RecoverConsumersAsync(AutorecoveringChannel channelToRe
515520
try
516521
{
517522
_recordedEntitiesSemaphore.Release();
518-
_consumerAboutToBeRecovered.Invoke(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments));
523+
await _consumerAboutToBeRecoveredWrapper.InvokeAsync(this, new RecoveringConsumerEventArgs(consumer.ConsumerTag, consumer.Arguments))
524+
.ConfigureAwait(false);
519525
}
520526
finally
521527
{
@@ -536,7 +542,8 @@ await _recordedEntitiesSemaphore.WaitAsync(cancellationToken)
536542
try
537543
{
538544
_recordedEntitiesSemaphore.Release();
539-
_consumerTagChangeAfterRecoveryWrapper.Invoke(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag));
545+
await _consumerTagChangeAfterRecoveryWrapper.InvokeAsync(this, new ConsumerTagChangedAfterRecoveryEventArgs(oldTag, newTag))
546+
.ConfigureAwait(false);
540547
}
541548
finally
542549
{

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ internal AutorecoveringConnection(ConnectionConfig config, IEndpointResolver end
6767
_innerConnection = innerConnection;
6868

6969
ConnectionShutdown += HandleConnectionShutdown;
70-
_recoverySucceededWrapper = new EventingWrapper<EventArgs>("OnConnectionRecovery", onException);
71-
_connectionRecoveryErrorWrapper = new EventingWrapper<ConnectionRecoveryErrorEventArgs>("OnConnectionRecoveryError", onException);
72-
_consumerTagChangeAfterRecoveryWrapper = new EventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs>("OnConsumerRecovery", onException);
73-
_queueNameChangedAfterRecoveryWrapper = new EventingWrapper<QueueNameChangedAfterRecoveryEventArgs>("OnQueueRecovery", onException);
70+
_recoverySucceededWrapper = new AsyncEventingWrapper<EventArgs>("OnConnectionRecovery", onExceptionAsync);
71+
_connectionRecoveryErrorWrapper = new AsyncEventingWrapper<ConnectionRecoveryErrorEventArgs>("OnConnectionRecoveryError", onExceptionAsync);
72+
_consumerTagChangeAfterRecoveryWrapper = new AsyncEventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs>("OnConsumerRecovery", onExceptionAsync);
73+
_queueNameChangedAfterRecoveryWrapper = new AsyncEventingWrapper<QueueNameChangedAfterRecoveryEventArgs>("OnQueueRecovery", onExceptionAsync);
7474

75-
void onException(Exception exception, string context) =>
76-
_innerConnection.OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
75+
Task onExceptionAsync(Exception exception, string context) =>
76+
_innerConnection.OnCallbackExceptionAsync(CallbackExceptionEventArgs.Build(exception, context));
7777
}
7878

7979
internal static async ValueTask<AutorecoveringConnection> CreateAsync(ConnectionConfig config, IEndpointResolver endpoints,
@@ -88,64 +88,64 @@ await innerConnection.OpenAsync(cancellationToken)
8888
return connection;
8989
}
9090

91-
public event EventHandler<EventArgs> RecoverySucceeded
91+
public event AsyncEventHandler<EventArgs> RecoverySucceeded
9292
{
9393
add => _recoverySucceededWrapper.AddHandler(value);
9494
remove => _recoverySucceededWrapper.RemoveHandler(value);
9595
}
96-
private EventingWrapper<EventArgs> _recoverySucceededWrapper;
96+
private AsyncEventingWrapper<EventArgs> _recoverySucceededWrapper;
9797

98-
public event EventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
98+
public event AsyncEventHandler<ConnectionRecoveryErrorEventArgs> ConnectionRecoveryError
9999
{
100100
add => _connectionRecoveryErrorWrapper.AddHandler(value);
101101
remove => _connectionRecoveryErrorWrapper.RemoveHandler(value);
102102
}
103-
private EventingWrapper<ConnectionRecoveryErrorEventArgs> _connectionRecoveryErrorWrapper;
103+
private AsyncEventingWrapper<ConnectionRecoveryErrorEventArgs> _connectionRecoveryErrorWrapper;
104104

105-
public event EventHandler<CallbackExceptionEventArgs> CallbackException
105+
public event AsyncEventHandler<CallbackExceptionEventArgs> CallbackException
106106
{
107107
add => InnerConnection.CallbackException += value;
108108
remove => InnerConnection.CallbackException -= value;
109109
}
110110

111-
public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
111+
public event AsyncEventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
112112
{
113113
add => InnerConnection.ConnectionBlocked += value;
114114
remove => InnerConnection.ConnectionBlocked -= value;
115115
}
116116

117-
public event EventHandler<ShutdownEventArgs> ConnectionShutdown
117+
public event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdown
118118
{
119119
add => InnerConnection.ConnectionShutdown += value;
120120
remove => InnerConnection.ConnectionShutdown -= value;
121121
}
122122

123-
public event EventHandler<EventArgs> ConnectionUnblocked
123+
public event AsyncEventHandler<EventArgs> ConnectionUnblocked
124124
{
125125
add => InnerConnection.ConnectionUnblocked += value;
126126
remove => InnerConnection.ConnectionUnblocked -= value;
127127
}
128128

129-
public event EventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
129+
public event AsyncEventHandler<ConsumerTagChangedAfterRecoveryEventArgs> ConsumerTagChangeAfterRecovery
130130
{
131131
add => _consumerTagChangeAfterRecoveryWrapper.AddHandler(value);
132132
remove => _consumerTagChangeAfterRecoveryWrapper.RemoveHandler(value);
133133
}
134-
private EventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs> _consumerTagChangeAfterRecoveryWrapper;
134+
private AsyncEventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs> _consumerTagChangeAfterRecoveryWrapper;
135135

136-
public event EventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery
136+
public event AsyncEventHandler<QueueNameChangedAfterRecoveryEventArgs> QueueNameChangedAfterRecovery
137137
{
138138
add => _queueNameChangedAfterRecoveryWrapper.AddHandler(value);
139139
remove => _queueNameChangedAfterRecoveryWrapper.RemoveHandler(value);
140140
}
141-
private EventingWrapper<QueueNameChangedAfterRecoveryEventArgs> _queueNameChangedAfterRecoveryWrapper;
141+
private AsyncEventingWrapper<QueueNameChangedAfterRecoveryEventArgs> _queueNameChangedAfterRecoveryWrapper;
142142

143-
public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
143+
public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
144144
{
145-
add => _consumerAboutToBeRecovered.AddHandler(value);
146-
remove => _consumerAboutToBeRecovered.RemoveHandler(value);
145+
add => _consumerAboutToBeRecoveredWrapper.AddHandler(value);
146+
remove => _consumerAboutToBeRecoveredWrapper.RemoveHandler(value);
147147
}
148-
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;
148+
private AsyncEventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecoveredWrapper;
149149

150150
public string? ClientProvidedName => _config.ClientProvidedName;
151151

0 commit comments

Comments
 (0)