Skip to content

Commit 6ff9685

Browse files
committed
Make session event async
1 parent 9f6dd2b commit 6ff9685

File tree

10 files changed

+65
-81
lines changed

10 files changed

+65
-81
lines changed

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ await Task.Delay(_config.NetworkRecoveryInterval, token)
126126

127127
/// <summary>
128128
/// Async cancels the main recovery loop and will block until the loop finishes, or the timeout
129-
/// expires, to prevent Close operations overlapping with recovery operations.
129+
/// expires, to prevent CloseAsync operations overlapping with recovery operations.
130130
/// </summary>
131131
private async ValueTask StopRecoveryLoopAsync(CancellationToken cancellationToken)
132132
{

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ protected ChannelBase(ConnectionConfig config, ISession session,
9090
_channelShutdownWrapper = new EventingWrapper<ShutdownEventArgs>("OnChannelShutdown", onException);
9191
_recoveryWrapper = new EventingWrapper<EventArgs>("OnChannelRecovery", onException);
9292
session.CommandReceived = HandleCommandAsync;
93-
session.SessionShutdown += OnSessionShutdown;
93+
session.SessionShutdownAsync += OnSessionShutdownAsync;
9494
Session = session;
9595
}
9696

@@ -403,12 +403,13 @@ await ModelSendAsync(method, k.CancellationToken)
403403
}
404404
}
405405

406-
internal void FinishClose()
406+
internal async Task FinishCloseAsync(CancellationToken cancellationToken)
407407
{
408408
ShutdownEventArgs? reason = CloseReason;
409409
if (reason != null)
410410
{
411-
Session.Close(reason);
411+
await Session.CloseAsync(reason, cancellationToken)
412+
.ConfigureAwait(false);
412413
}
413414

414415
m_connectionStartCell?.TrySetResult(null);
@@ -470,7 +471,7 @@ internal void OnCallbackException(CallbackExceptionEventArgs args)
470471
///<summary>Broadcasts notification of the final shutdown of the channel.</summary>
471472
///<remarks>
472473
///<para>
473-
///Do not call anywhere other than at the end of OnSessionShutdown.
474+
///Do not call anywhere other than at the end of OnSessionShutdownAsync.
474475
///</para>
475476
///<para>
476477
///Must not be called when m_closeReason is null, because
@@ -517,12 +518,13 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
517518
*
518519
* Aborted PR: https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1551
519520
*/
520-
private void OnSessionShutdown(object? sender, ShutdownEventArgs reason)
521+
private Task OnSessionShutdownAsync(object? sender, ShutdownEventArgs reason)
521522
{
522523
ConsumerDispatcher.Quiesce();
523524
SetCloseReason(reason);
524525
OnChannelShutdown(reason);
525526
ConsumerDispatcher.Shutdown(reason);
527+
return Task.CompletedTask;
526528
}
527529

528530
[MemberNotNull(nameof(_closeReason))]
@@ -533,7 +535,7 @@ internal bool SetCloseReason(ShutdownEventArgs reason)
533535
throw new ArgumentNullException(nameof(reason));
534536
}
535537

536-
// NB: this ensures that Close is only called once on a channel
538+
// NB: this ensures that CloseAsync is only called once on a channel
537539
return Interlocked.CompareExchange(ref _closeReason, reason, null) is null;
538540
}
539541

@@ -649,13 +651,15 @@ protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, Cancella
649651
channelClose._classId,
650652
channelClose._methodId));
651653

652-
Session.Close(_closeReason, false);
654+
await Session.CloseAsync(_closeReason, false, cancellationToken)
655+
.ConfigureAwait(false);
653656

654657
var method = new ChannelCloseOk();
655658
await ModelSendAsync(method, cancellationToken)
656659
.ConfigureAwait(false);
657660

658-
Session.Notify();
661+
await Session.NotifyAsync(cancellationToken)
662+
.ConfigureAwait(false);
659663
return true;
660664
}
661665

@@ -665,7 +669,8 @@ protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
665669
* Note:
666670
* This call _must_ come before completing the async continuation
667671
*/
668-
FinishClose();
672+
await FinishCloseAsync(cancellationToken)
673+
.ConfigureAwait(false);
669674

670675
if (_continuationQueue.TryPeek<ChannelCloseAsyncRpcContinuation>(out ChannelCloseAsyncRpcContinuation? k))
671676
{
@@ -715,7 +720,7 @@ protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, Cance
715720
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId);
716721
try
717722
{
718-
await Session.Connection.ClosedViaPeerAsync(reason)
723+
await Session.Connection.ClosedViaPeerAsync(reason, cancellationToken)
719724
.ConfigureAwait(false);
720725

721726
var replyMethod = new ConnectionCloseOk();

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe,
228228
if (SetCloseReason(hpe.ShutdownReason))
229229
{
230230
await OnShutdownAsync(hpe.ShutdownReason).ConfigureAwait(false);
231-
await _session0.SetSessionClosingAsync(false)
231+
await _session0.SetSessionClosingAsync(false, mainLoopCancellationToken)
232232
.ConfigureAwait(false);
233233
try
234234
{

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti
332332

333333
await OnShutdownAsync(reason)
334334
.ConfigureAwait(false);
335-
await _session0.SetSessionClosingAsync(false)
335+
await _session0.SetSessionClosingAsync(false, cancellationToken)
336336
.ConfigureAwait(false);
337337

338338
try
@@ -411,7 +411,7 @@ await _frameHandler.CloseAsync(cancellationToken)
411411
}
412412
}
413413

414-
internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason)
414+
internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason, CancellationToken cancellationToken)
415415
{
416416
if (false == SetCloseReason(reason))
417417
{
@@ -424,7 +424,7 @@ internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason)
424424

425425
await OnShutdownAsync(reason)
426426
.ConfigureAwait(false);
427-
await _session0.SetSessionClosingAsync(true)
427+
await _session0.SetSessionClosingAsync(true, cancellationToken)
428428
.ConfigureAwait(false);
429429
MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true);
430430
}
@@ -436,9 +436,11 @@ private async Task FinishCloseAsync(CancellationToken cancellationToken)
436436
_closed = true;
437437
MaybeStopHeartbeatTimers();
438438

439-
await _frameHandler.CloseAsync(cancellationToken).ConfigureAwait(false);
439+
await _frameHandler.CloseAsync(cancellationToken)
440+
.ConfigureAwait(false);
440441
_channel0.SetCloseReason(CloseReason!);
441-
_channel0.FinishClose();
442+
await _channel0.FinishCloseAsync(cancellationToken)
443+
.ConfigureAwait(false);
442444
RabbitMqClientEventSource.Log.ConnectionClosed();
443445
}
444446

projects/RabbitMQ.Client/client/impl/ISession.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
using System;
3333
using System.Threading;
3434
using System.Threading.Tasks;
35+
using RabbitMQ.Client.Events;
3536
using RabbitMQ.Client.Framing.Impl;
3637

3738
namespace RabbitMQ.Client.Impl
@@ -68,15 +69,15 @@ internal interface ISession
6869
///<summary>
6970
/// Multicast session shutdown event.
7071
///</summary>
71-
event EventHandler<ShutdownEventArgs> SessionShutdown;
72+
event AsyncEventHandler<ShutdownEventArgs> SessionShutdownAsync;
7273

73-
void Close(ShutdownEventArgs reason);
74+
Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken);
7475

75-
void Close(ShutdownEventArgs reason, bool notify);
76+
Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken);
7677

7778
Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken);
7879

79-
void Notify();
80+
Task NotifyAsync(CancellationToken cancellationToken);
8081

8182
ValueTask TransmitAsync<T>(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod;
8283

projects/RabbitMQ.Client/client/impl/MainSession.cs

Lines changed: 3 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -82,38 +82,10 @@ public override Task HandleFrameAsync(InboundFrame frame, CancellationToken canc
8282
return base.HandleFrameAsync(frame, cancellationToken);
8383
}
8484

85-
///<summary> Set channel 0 as quiescing </summary>
86-
///<remarks>
87-
/// Method should be idempotent. Cannot use base.Close
88-
/// method call because that would prevent us from
89-
/// sending/receiving Close/CloseOk commands
90-
///</remarks>
91-
public void SetSessionClosing(bool closeIsServerInitiated)
85+
public async Task SetSessionClosingAsync(bool closeIsServerInitiated, CancellationToken cancellationToken)
9286
{
93-
if (_closingSemaphore.Wait(InternalConstants.DefaultConnectionAbortTimeout))
94-
{
95-
try
96-
{
97-
if (false == _closing)
98-
{
99-
_closing = true;
100-
_closeIsServerInitiated = closeIsServerInitiated;
101-
}
102-
}
103-
finally
104-
{
105-
_closingSemaphore.Release();
106-
}
107-
}
108-
else
109-
{
110-
throw new InvalidOperationException("couldn't enter semaphore");
111-
}
112-
}
113-
114-
public async Task SetSessionClosingAsync(bool closeIsServerInitiated)
115-
{
116-
if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout).ConfigureAwait(false))
87+
if (await _closingSemaphore.WaitAsync(InternalConstants.DefaultConnectionAbortTimeout, cancellationToken)
88+
.ConfigureAwait(false))
11789
{
11890
try
11991
{

projects/RabbitMQ.Client/client/impl/SessionBase.cs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
using System.Threading;
3737
using System.Threading.Tasks;
3838
using RabbitMQ.Client.client.framing;
39+
using RabbitMQ.Client.Events;
3940
using RabbitMQ.Client.Exceptions;
4041
using RabbitMQ.Client.Framing.Impl;
4142
using RabbitMQ.Client.Logging;
@@ -58,13 +59,13 @@ protected SessionBase(Connection connection, ushort channelNumber)
5859
RabbitMqClientEventSource.Log.ChannelOpened();
5960
}
6061

61-
public event EventHandler<ShutdownEventArgs> SessionShutdown
62+
public event AsyncEventHandler<ShutdownEventArgs> SessionShutdownAsync
6263
{
6364
add
6465
{
6566
if (CloseReason is null)
6667
{
67-
_sessionShutdownWrapper.AddHandler(value);
68+
_sessionShutdownAsyncWrapper.AddHandler(value);
6869
}
6970
else
7071
{
@@ -73,10 +74,10 @@ public event EventHandler<ShutdownEventArgs> SessionShutdown
7374
}
7475
remove
7576
{
76-
_sessionShutdownWrapper.RemoveHandler(value);
77+
_sessionShutdownAsyncWrapper.RemoveHandler(value);
7778
}
7879
}
79-
private EventingWrapper<ShutdownEventArgs> _sessionShutdownWrapper;
80+
private AsyncEventingWrapper<ShutdownEventArgs> _sessionShutdownAsyncWrapper;
8081

8182
public ushort ChannelNumber { get; }
8283

@@ -86,29 +87,17 @@ public event EventHandler<ShutdownEventArgs> SessionShutdown
8687
[MemberNotNullWhen(false, nameof(CloseReason))]
8788
public bool IsOpen => CloseReason is null;
8889

89-
public Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason)
90-
{
91-
Close(reason);
92-
return Task.CompletedTask;
93-
}
94-
95-
public void OnSessionShutdown(ShutdownEventArgs reason)
96-
{
97-
Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync;
98-
_sessionShutdownWrapper.Invoke(this, reason);
99-
}
100-
10190
public override string ToString()
10291
{
10392
return $"{GetType().Name}#{ChannelNumber}:{Connection}";
10493
}
10594

106-
public void Close(ShutdownEventArgs reason)
95+
public Task CloseAsync(ShutdownEventArgs reason, CancellationToken cancellationToken)
10796
{
108-
Close(reason, true);
97+
return CloseAsync(reason, true, cancellationToken);
10998
}
11099

111-
public void Close(ShutdownEventArgs reason, bool notify)
100+
public Task CloseAsync(ShutdownEventArgs reason, bool notify, CancellationToken cancellationToken)
112101
{
113102
if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null)
114103
{
@@ -117,23 +106,25 @@ public void Close(ShutdownEventArgs reason, bool notify)
117106

118107
if (notify)
119108
{
120-
OnSessionShutdown(CloseReason!);
109+
return OnSessionShutdownAsync(CloseReason!);
121110
}
111+
112+
return Task.CompletedTask;
122113
}
123114

124115
public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken);
125116

126-
public void Notify()
117+
public Task NotifyAsync(CancellationToken cancellationToken)
127118
{
128119
// Ensure that we notify only when session is already closed
129120
// If not, throw exception, since this is a serious bug in the library
130121
ShutdownEventArgs? reason = CloseReason;
131122
if (reason is null)
132123
{
133-
throw new InvalidOperationException("Internal Error in SessionBase.Notify");
124+
throw new InvalidOperationException("Internal Error in SessionBase.NotifyAsync");
134125
}
135126

136-
OnSessionShutdown(reason);
127+
return OnSessionShutdownAsync(reason);
137128
}
138129

139130
public virtual ValueTask TransmitAsync<T>(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod
@@ -161,6 +152,17 @@ public ValueTask TransmitAsync<TMethod, THeader>(in TMethod cmd, in THeader head
161152
RabbitMQActivitySource.PopulateMessageEnvelopeSize(Activity.Current, bytes.Size);
162153
return Connection.WriteAsync(bytes, cancellationToken);
163154
}
155+
156+
private Task OnConnectionShutdownAsync(object? conn, ShutdownEventArgs reason)
157+
{
158+
return CloseAsync(reason, CancellationToken.None);
159+
}
160+
161+
private Task OnSessionShutdownAsync(ShutdownEventArgs reason)
162+
{
163+
Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync;
164+
return _sessionShutdownAsyncWrapper.InvokeAsync(this, reason);
165+
}
164166

165167
private void ThrowAlreadyClosedException()
166168
=> throw new AlreadyClosedException(CloseReason!);

projects/RabbitMQ.Client/client/impl/SessionManager.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System.Collections.Generic;
33+
using System.Threading.Tasks;
3334
using RabbitMQ.Client.Exceptions;
3435
using RabbitMQ.Client.Framing.Impl;
3536
using RabbitMQ.Util;
@@ -77,20 +78,21 @@ public ISession Create()
7778

7879
ISession session = new Session(_connection,
7980
(ushort)channelNumber, _maxInboundMessageBodySize);
80-
session.SessionShutdown += HandleSessionShutdown;
81+
session.SessionShutdownAsync += HandleSessionShutdownAsync;
8182
_sessionMap[channelNumber] = session;
8283
return session;
8384
}
8485
}
8586

86-
private void HandleSessionShutdown(object? sender, ShutdownEventArgs reason)
87+
private Task HandleSessionShutdownAsync(object? sender, ShutdownEventArgs reason)
8788
{
8889
lock (_sessionMap)
8990
{
9091
var session = (ISession)sender!;
9192
_sessionMap.Remove(session.ChannelNumber);
9293
_ints.Free(session.ChannelNumber);
9394
}
95+
return Task.CompletedTask;
9496
}
9597

9698
public ISession Lookup(int number)

projects/Test/Integration/TestChannelShutdown.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ public async Task TestConsumerDispatcherShutdown()
5555
tcs.SetResult(true);
5656
};
5757

58-
Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before Close");
58+
Assert.False(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync");
5959
await _channel.CloseAsync();
6060
await WaitAsync(tcs, TimeSpan.FromSeconds(5), "channel shutdown");
61-
Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after Close");
61+
Assert.True(autorecoveringChannel.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync");
6262
}
6363
}
6464
}

projects/Test/Integration/TestConnectionShutdown.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ public async Task TestConsumerDispatcherShutdown()
156156
{
157157
tcs.SetResult(true);
158158
};
159-
Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before Close");
159+
Assert.False(m.ConsumerDispatcher.IsShutdown, "dispatcher should NOT be shut down before CloseAsync");
160160
await _conn.CloseAsync();
161161
await WaitAsync(tcs, TimeSpan.FromSeconds(3), "channel shutdown");
162-
Assert.True(m.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after Close");
162+
Assert.True(m.ConsumerDispatcher.IsShutdown, "dispatcher should be shut down after CloseAsync");
163163
}
164164

165165
[Fact]

0 commit comments

Comments
 (0)