Skip to content

Commit 9a86e3c

Browse files
committed
* Change IConnection event ConnectionShutdown to AsyncEventHandler
* Remove sync methods from `IConsumerDispatcher` * Make several `ISession` methods async
1 parent 344a00a commit 9a86e3c

31 files changed

+269
-230
lines changed

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ RabbitMQ.Client.IConnection.ClientProvidedName.get -> string
505505
RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
506506
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
507507
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
508-
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
508+
RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
509509
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
510510
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
511511
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public interface IConnection : INetworkConnection, IDisposable
155155
/// event handler is added to this event, the event handler
156156
/// will be fired immediately.
157157
/// </remarks>
158-
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
158+
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync;
159159

160160
/// <summary>
161161
/// Raised when the connection completes recovery.

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
4646
private Task? _recoveryTask;
4747
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();
4848

49-
private void HandleConnectionShutdown(object _, ShutdownEventArgs args)
49+
private Task HandleConnectionShutdownAsync(object _, ShutdownEventArgs args)
5050
{
5151
if (ShouldTriggerConnectionRecovery(args))
5252
{
@@ -80,6 +80,8 @@ static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
8080

8181
return false;
8282
}
83+
84+
return Task.CompletedTask;
8385
}
8486

8587
private async Task RecoverConnectionAsync()

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void onException(Exception exception, string context) =>
8888
_consumerTagChangeAfterRecoveryWrapper = new EventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs>("OnConsumerRecovery", onException);
8989
_queueNameChangedAfterRecoveryWrapper = new EventingWrapper<QueueNameChangedAfterRecoveryEventArgs>("OnQueueRecovery", onException);
9090

91-
ConnectionShutdown += HandleConnectionShutdown;
91+
ConnectionShutdownAsync += HandleConnectionShutdownAsync;
9292
}
9393

9494
public event EventHandler<EventArgs> RecoverySucceeded
@@ -117,10 +117,10 @@ public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
117117
remove => InnerConnection.ConnectionBlocked -= value;
118118
}
119119

120-
public event EventHandler<ShutdownEventArgs> ConnectionShutdown
120+
public event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync
121121
{
122-
add => InnerConnection.ConnectionShutdown += value;
123-
remove => InnerConnection.ConnectionShutdown -= value;
122+
add => InnerConnection.ConnectionShutdownAsync += value;
123+
remove => InnerConnection.ConnectionShutdownAsync -= value;
124124
}
125125

126126
public event EventHandler<EventArgs> ConnectionUnblocked

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ protected ChannelBase(ConnectionConfig config, ISession session)
8484

8585
Action<Exception, string> onException = (exception, context) =>
8686
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
87+
8788
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
8889
_basicNacksWrapper = new EventingWrapper<BasicNackEventArgs>("OnBasicNack", onException);
8990
_basicReturnWrapper = new EventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onException);
@@ -93,7 +94,7 @@ protected ChannelBase(ConnectionConfig config, ISession session)
9394
_channelShutdownWrapper = new EventingWrapper<ShutdownEventArgs>("OnChannelShutdown", onException);
9495
_recoveryWrapper = new EventingWrapper<EventArgs>("OnChannelRecovery", onException);
9596
session.CommandReceived = HandleCommandAsync;
96-
session.SessionShutdown += OnSessionShutdown;
97+
session.SessionShutdownAsync += OnSessionShutdownAsync;
9798
Session = session;
9899
}
99100

@@ -407,12 +408,13 @@ await ModelSendAsync(method, k.CancellationToken)
407408
}
408409
}
409410

410-
internal void FinishClose()
411+
internal async Task FinishCloseAsync()
411412
{
412413
ShutdownEventArgs reason = CloseReason;
413414
if (reason != null)
414415
{
415-
Session.Close(reason);
416+
await Session.CloseAsync(reason)
417+
.ConfigureAwait(false);
416418
}
417419

418420
m_connectionStartCell?.TrySetResult(null);
@@ -505,14 +507,12 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
505507
_flowControlBlock.Set();
506508
}
507509

508-
// TODO async
509-
private void OnSessionShutdown(object sender, ShutdownEventArgs reason)
510+
private Task OnSessionShutdownAsync(object sender, ShutdownEventArgs reason)
510511
{
511512
ConsumerDispatcher.Quiesce();
512513
SetCloseReason(reason);
513514
OnChannelShutdown(reason);
514-
// TODO async
515-
ConsumerDispatcher.Shutdown(reason);
515+
return ConsumerDispatcher.ShutdownAsync(reason);
516516
}
517517

518518
internal bool SetCloseReason(ShutdownEventArgs reason)
@@ -723,7 +723,8 @@ protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, Cancella
723723
channelClose._classId,
724724
channelClose._methodId));
725725

726-
Session.Close(CloseReason, false);
726+
await Session.CloseAsync(CloseReason, false)
727+
.ConfigureAwait(false);
727728

728729
var method = new ChannelCloseOk();
729730
await ModelSendAsync(method, cancellationToken)
@@ -734,7 +735,8 @@ await ModelSendAsync(method, cancellationToken)
734735
finally
735736
{
736737
cmd.ReturnBuffers();
737-
Session.Notify();
738+
await Session.NotifyAsync()
739+
.ConfigureAwait(false);
738740
}
739741
}
740742

@@ -746,7 +748,8 @@ protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
746748
* Note:
747749
* This call _must_ come before completing the async continuation
748750
*/
749-
FinishClose();
751+
await FinishCloseAsync()
752+
.ConfigureAwait(false);
750753

751754
if (_continuationQueue.TryPeek<ChannelCloseAsyncRpcContinuation>(out var k))
752755
{
@@ -815,7 +818,8 @@ protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, Cance
815818
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId);
816819
try
817820
{
818-
Session.Connection.ClosedViaPeer(reason);
821+
await Session.Connection.ClosedViaPeerAsync(reason)
822+
.ConfigureAwait(false);
819823

820824
var replyMethod = new ConnectionCloseOk();
821825
await ModelSendAsync(replyMethod, cancellationToken)

projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ private async void HeartbeatReadTimerCallback(object? state)
109109
{
110110
var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds");
111111
LogCloseError(eose.Message, eose);
112-
HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
112+
await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose))
113+
.ConfigureAwait(false);
113114
shouldTerminate = true;
114115
}
115116
}

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ await ReceiveLoopAsync(mainLoopToken)
5959
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
6060
0, "End of stream",
6161
exception: eose);
62-
HandleMainLoopException(ea);
62+
await HandleMainLoopExceptionAsync(ea)
63+
.ConfigureAwait(false);
6364
}
6465
catch (HardProtocolException hpe)
6566
{
@@ -75,15 +76,17 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
7576
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
7677
Constants.InternalError, fileLoadException.Message,
7778
exception: fileLoadException);
78-
HandleMainLoopException(ea);
79+
await HandleMainLoopExceptionAsync(ea)
80+
.ConfigureAwait(false);
7981
}
8082
catch (Exception ex)
8183
{
8284
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
8385
Constants.InternalError,
8486
$"Unexpected Exception: {ex.Message}",
8587
exception: ex);
86-
HandleMainLoopException(ea);
88+
await HandleMainLoopExceptionAsync(ea)
89+
.ConfigureAwait(false);
8790
}
8891

8992
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout);
@@ -175,26 +178,29 @@ private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop =
175178
MaybeStopHeartbeatTimers();
176179
}
177180

178-
private void HandleMainLoopException(ShutdownEventArgs reason)
181+
private Task HandleMainLoopExceptionAsync(ShutdownEventArgs reason)
179182
{
180183
string message = reason.GetLogMessage();
181184
if (false == SetCloseReason(reason))
182185
{
183186
LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception);
184-
return;
187+
return Task.CompletedTask;
185188
}
186189

187190
_channel0.MaybeSetConnectionStartException(reason.Exception);
188191

189-
OnShutdown(reason);
190192
LogCloseError($"Unexpected connection closure: {message}", reason.Exception);
193+
194+
return OnShutdownAsync(reason);
191195
}
192196

193197
private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, CancellationToken cancellationToken)
194198
{
195199
if (SetCloseReason(hpe.ShutdownReason))
196200
{
197-
OnShutdown(hpe.ShutdownReason);
201+
await OnShutdownAsync(hpe.ShutdownReason)
202+
.ConfigureAwait(false);
203+
198204
await _session0.SetSessionClosingAsync(false)
199205
.ConfigureAwait(false);
200206
try

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,13 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler)
6464
_config = config;
6565
_frameHandler = frameHandler;
6666

67-
Action<Exception, string> onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
67+
Action<Exception, string> onException = (exception, context) =>
68+
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
69+
6870
_callbackExceptionWrapper = new EventingWrapper<CallbackExceptionEventArgs>(string.Empty, (exception, context) => { });
6971
_connectionBlockedWrapper = new EventingWrapper<ConnectionBlockedEventArgs>("OnConnectionBlocked", onException);
7072
_connectionUnblockedWrapper = new EventingWrapper<EventArgs>("OnConnectionUnblocked", onException);
71-
_connectionShutdownWrapper = new EventingWrapper<ShutdownEventArgs>("OnShutdown", onException);
73+
_connectionShutdownWrapperAsync = new AsyncEventingWrapper<ShutdownEventArgs>("OnConnectionShutdown", onException);
7274

7375
_sessionManager = new SessionManager(this, 0);
7476
_session0 = new MainSession(this);
@@ -146,15 +148,15 @@ public event EventHandler<RecoveringConsumerEventArgs> RecoveringConsumer
146148
}
147149
private EventingWrapper<RecoveringConsumerEventArgs> _consumerAboutToBeRecovered;
148150

149-
public event EventHandler<ShutdownEventArgs> ConnectionShutdown
151+
public event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync
150152
{
151153
add
152154
{
153155
ThrowIfDisposed();
154156
ShutdownEventArgs? reason = CloseReason;
155157
if (reason is null)
156158
{
157-
_connectionShutdownWrapper.AddHandler(value);
159+
_connectionShutdownWrapperAsync.AddHandler(value);
158160
}
159161
else
160162
{
@@ -164,10 +166,10 @@ public event EventHandler<ShutdownEventArgs> ConnectionShutdown
164166
remove
165167
{
166168
ThrowIfDisposed();
167-
_connectionShutdownWrapper.RemoveHandler(value);
169+
_connectionShutdownWrapperAsync.RemoveHandler(value);
168170
}
169171
}
170-
private EventingWrapper<ShutdownEventArgs> _connectionShutdownWrapper;
172+
private AsyncEventingWrapper<ShutdownEventArgs> _connectionShutdownWrapperAsync;
171173

172174
/// <summary>
173175
/// This event is never fired by non-recovering connections but it is a part of the <see cref="IConnection"/> interface.
@@ -210,7 +212,7 @@ internal void TakeOver(Connection other)
210212
_callbackExceptionWrapper.Takeover(other._callbackExceptionWrapper);
211213
_connectionBlockedWrapper.Takeover(other._connectionBlockedWrapper);
212214
_connectionUnblockedWrapper.Takeover(other._connectionUnblockedWrapper);
213-
_connectionShutdownWrapper.Takeover(other._connectionShutdownWrapper);
215+
_connectionShutdownWrapperAsync.Takeover(other._connectionShutdownWrapperAsync);
214216
}
215217

216218
internal async ValueTask<IConnection> OpenAsync(CancellationToken cancellationToken)
@@ -321,7 +323,9 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti
321323
{
322324
cancellationToken.ThrowIfCancellationRequested();
323325

324-
OnShutdown(reason);
326+
await OnShutdownAsync(reason)
327+
.ConfigureAwait(false);
328+
325329
await _session0.SetSessionClosingAsync(false)
326330
.ConfigureAwait(false);
327331

@@ -394,7 +398,7 @@ await _frameHandler.CloseAsync(cancellationToken)
394398
}
395399
}
396400

397-
internal void ClosedViaPeer(ShutdownEventArgs reason)
401+
internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason)
398402
{
399403
if (false == SetCloseReason(reason))
400404
{
@@ -405,30 +409,39 @@ internal void ClosedViaPeer(ShutdownEventArgs reason)
405409
// We are quiescing, but still allow for server-close
406410
}
407411

408-
OnShutdown(reason);
412+
await OnShutdownAsync(reason)
413+
.ConfigureAwait(false);
414+
409415
_session0.SetSessionClosing(true);
416+
410417
MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true);
411418
}
412419

413420
// Only call at the end of the Mainloop or HeartbeatLoop
414421
private async Task FinishCloseAsync(CancellationToken cancellationToken)
415422
{
416423
_mainLoopCts.Cancel();
424+
417425
_closed = true;
426+
418427
MaybeStopHeartbeatTimers();
419428

420429
await _frameHandler.CloseAsync(cancellationToken)
421430
.ConfigureAwait(false);
431+
422432
_channel0.SetCloseReason(CloseReason);
423-
_channel0.FinishClose();
433+
434+
await _channel0.FinishCloseAsync()
435+
.ConfigureAwait(false);
436+
424437
RabbitMqClientEventSource.Log.ConnectionClosed();
425438
}
426439

427440
///<summary>Broadcasts notification of the final shutdown of the connection.</summary>
428-
private void OnShutdown(ShutdownEventArgs reason)
441+
private Task OnShutdownAsync(ShutdownEventArgs reason)
429442
{
430443
ThrowIfDisposed();
431-
_connectionShutdownWrapper.Invoke(this, reason);
444+
return _connectionShutdownWrapperAsync.InvokeAsync(this, reason);
432445
}
433446

434447
private bool SetCloseReason(ShutdownEventArgs reason)

0 commit comments

Comments
 (0)