Skip to content

Commit a3ac58c

Browse files
committed
* Change IConnection event ConnectionShutdown to AsyncEventHandler
* Remove sync methods from `IConsumerDispatcher` * Make several `ISession` methods async * Enable confirms in `PublishMessagesWhileClosingConnAsync` * Ensure `RABBITMQ_LONG_RUNNING_TESTS` and SSL env vars are set for sequential integration tests * Don't immediately await the frame handler close in `TestDisposedWithSocketClosedOutOfBand`
1 parent 344a00a commit a3ac58c

34 files changed

+373
-262
lines changed

.github/workflows/build-test.yaml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ jobs:
7373
Receive-Job -Job $tx; `
7474
& "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; `
7575
dotnet test `
76-
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
7776
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
77+
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
7878
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' `
7979
--environment 'PASSWORD=grapefruit' `
8080
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
@@ -114,7 +114,12 @@ jobs:
114114
id: install-start-rabbitmq
115115
run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1
116116
- name: Sequential Integration Tests
117-
run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
117+
run: dotnet test `
118+
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' `
119+
--environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" `
120+
--environment 'PASSWORD=grapefruit' `
121+
--environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" `
122+
"${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
118123
- name: Maybe upload RabbitMQ logs
119124
if: failure()
120125
uses: actions/upload-artifact@v4
@@ -182,8 +187,8 @@ jobs:
182187
- name: Integration Tests
183188
run: |
184189
dotnet test \
185-
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
186190
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
191+
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
187192
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
188193
--environment 'PASSWORD=grapefruit' \
189194
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
@@ -222,7 +227,10 @@ jobs:
222227
- name: Sequential Integration Tests
223228
run: |
224229
dotnet test \
230+
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
225231
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \
232+
--environment 'PASSWORD=grapefruit' \
233+
--environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \
226234
"${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed'
227235
- name: Maybe upload RabbitMQ logs
228236
if: failure()

Makefile

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ build:
2121
test:
2222
dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed'
2323
dotnet test --environment 'GITHUB_ACTIONS=true' \
24-
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
2524
--environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
25+
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
2626
--environment 'RABBITMQ_TOXIPROXY_TESTS=true' \
2727
--environment 'PASSWORD=grapefruit' \
2828
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
2929
"$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed'
30-
dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'
30+
dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \
31+
--environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \
32+
--environment 'PASSWORD=grapefruit' \
33+
--environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \
34+
$(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed'
3135

3236
# Note:
3337
# You must have the expected OAuth2 environment set up for this target

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,7 +505,7 @@ RabbitMQ.Client.IConnection.ClientProvidedName.get -> string
505505
RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs
506506
RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler<RabbitMQ.Client.Events.ConnectionBlockedEventArgs>
507507
RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler<RabbitMQ.Client.Events.ConnectionRecoveryErrorEventArgs>
508-
RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler<RabbitMQ.Client.ShutdownEventArgs>
508+
RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler<RabbitMQ.Client.ShutdownEventArgs>
509509
RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler<System.EventArgs>
510510
RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler<RabbitMQ.Client.Events.ConsumerTagChangedAfterRecoveryEventArgs>
511511
RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint

projects/RabbitMQ.Client/client/api/IConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public interface IConnection : INetworkConnection, IDisposable
155155
/// event handler is added to this event, the event handler
156156
/// will be fired immediately.
157157
/// </remarks>
158-
event EventHandler<ShutdownEventArgs> ConnectionShutdown;
158+
event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync;
159159

160160
/// <summary>
161161
/// Raised when the connection completes recovery.

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection
4646
private Task? _recoveryTask;
4747
private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource();
4848

49-
private void HandleConnectionShutdown(object _, ShutdownEventArgs args)
49+
private Task HandleConnectionShutdownAsync(object _, ShutdownEventArgs args)
5050
{
5151
if (ShouldTriggerConnectionRecovery(args))
5252
{
@@ -80,6 +80,8 @@ static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
8080

8181
return false;
8282
}
83+
84+
return Task.CompletedTask;
8385
}
8486

8587
private async Task RecoverConnectionAsync()

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void onException(Exception exception, string context) =>
8888
_consumerTagChangeAfterRecoveryWrapper = new EventingWrapper<ConsumerTagChangedAfterRecoveryEventArgs>("OnConsumerRecovery", onException);
8989
_queueNameChangedAfterRecoveryWrapper = new EventingWrapper<QueueNameChangedAfterRecoveryEventArgs>("OnQueueRecovery", onException);
9090

91-
ConnectionShutdown += HandleConnectionShutdown;
91+
ConnectionShutdownAsync += HandleConnectionShutdownAsync;
9292
}
9393

9494
public event EventHandler<EventArgs> RecoverySucceeded
@@ -117,10 +117,10 @@ public event EventHandler<ConnectionBlockedEventArgs> ConnectionBlocked
117117
remove => InnerConnection.ConnectionBlocked -= value;
118118
}
119119

120-
public event EventHandler<ShutdownEventArgs> ConnectionShutdown
120+
public event AsyncEventHandler<ShutdownEventArgs> ConnectionShutdownAsync
121121
{
122-
add => InnerConnection.ConnectionShutdown += value;
123-
remove => InnerConnection.ConnectionShutdown -= value;
122+
add => InnerConnection.ConnectionShutdownAsync += value;
123+
remove => InnerConnection.ConnectionShutdownAsync -= value;
124124
}
125125

126126
public event EventHandler<EventArgs> ConnectionUnblocked

projects/RabbitMQ.Client/client/impl/ChannelBase.cs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ protected ChannelBase(ConnectionConfig config, ISession session)
8484

8585
Action<Exception, string> onException = (exception, context) =>
8686
OnCallbackException(CallbackExceptionEventArgs.Build(exception, context));
87+
8788
_basicAcksWrapper = new EventingWrapper<BasicAckEventArgs>("OnBasicAck", onException);
8889
_basicNacksWrapper = new EventingWrapper<BasicNackEventArgs>("OnBasicNack", onException);
8990
_basicReturnWrapper = new EventingWrapper<BasicReturnEventArgs>("OnBasicReturn", onException);
@@ -93,7 +94,7 @@ protected ChannelBase(ConnectionConfig config, ISession session)
9394
_channelShutdownWrapper = new EventingWrapper<ShutdownEventArgs>("OnChannelShutdown", onException);
9495
_recoveryWrapper = new EventingWrapper<EventArgs>("OnChannelRecovery", onException);
9596
session.CommandReceived = HandleCommandAsync;
96-
session.SessionShutdown += OnSessionShutdown;
97+
session.SessionShutdownAsync += OnSessionShutdownAsync;
9798
Session = session;
9899
}
99100

@@ -407,12 +408,13 @@ await ModelSendAsync(method, k.CancellationToken)
407408
}
408409
}
409410

410-
internal void FinishClose()
411+
internal async Task FinishCloseAsync()
411412
{
412413
ShutdownEventArgs reason = CloseReason;
413414
if (reason != null)
414415
{
415-
Session.Close(reason);
416+
await Session.CloseAsync(reason)
417+
.ConfigureAwait(false);
416418
}
417419

418420
m_connectionStartCell?.TrySetResult(null);
@@ -488,7 +490,7 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
488490
if (_confirmsTaskCompletionSources?.Count > 0)
489491
{
490492
var exception = new AlreadyClosedException(reason);
491-
foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
493+
foreach (TaskCompletionSource<bool> confirmsTaskCompletionSource in _confirmsTaskCompletionSources)
492494
{
493495
confirmsTaskCompletionSource.TrySetException(exception);
494496
}
@@ -505,14 +507,12 @@ private void OnChannelShutdown(ShutdownEventArgs reason)
505507
_flowControlBlock.Set();
506508
}
507509

508-
// TODO async
509-
private void OnSessionShutdown(object sender, ShutdownEventArgs reason)
510+
private Task OnSessionShutdownAsync(object sender, ShutdownEventArgs reason)
510511
{
511512
ConsumerDispatcher.Quiesce();
512513
SetCloseReason(reason);
513514
OnChannelShutdown(reason);
514-
// TODO async
515-
ConsumerDispatcher.Shutdown(reason);
515+
return ConsumerDispatcher.ShutdownAsync(reason);
516516
}
517517

518518
internal bool SetCloseReason(ShutdownEventArgs reason)
@@ -723,7 +723,8 @@ protected async Task<bool> HandleChannelCloseAsync(IncomingCommand cmd, Cancella
723723
channelClose._classId,
724724
channelClose._methodId));
725725

726-
Session.Close(CloseReason, false);
726+
await Session.CloseAsync(CloseReason, false)
727+
.ConfigureAwait(false);
727728

728729
var method = new ChannelCloseOk();
729730
await ModelSendAsync(method, cancellationToken)
@@ -734,7 +735,8 @@ await ModelSendAsync(method, cancellationToken)
734735
finally
735736
{
736737
cmd.ReturnBuffers();
737-
Session.Notify();
738+
await Session.NotifyAsync()
739+
.ConfigureAwait(false);
738740
}
739741
}
740742

@@ -746,7 +748,8 @@ protected async Task<bool> HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel
746748
* Note:
747749
* This call _must_ come before completing the async continuation
748750
*/
749-
FinishClose();
751+
await FinishCloseAsync()
752+
.ConfigureAwait(false);
750753

751754
if (_continuationQueue.TryPeek<ChannelCloseAsyncRpcContinuation>(out var k))
752755
{
@@ -815,7 +818,8 @@ protected async Task<bool> HandleConnectionCloseAsync(IncomingCommand cmd, Cance
815818
var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId);
816819
try
817820
{
818-
Session.Connection.ClosedViaPeer(reason);
821+
await Session.Connection.ClosedViaPeerAsync(reason)
822+
.ConfigureAwait(false);
819823

820824
var replyMethod = new ConnectionCloseOk();
821825
await ModelSendAsync(replyMethod, cancellationToken)

projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ private async void HeartbeatReadTimerCallback(object? state)
109109
{
110110
var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds");
111111
LogCloseError(eose.Message, eose);
112-
HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose));
112+
await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose))
113+
.ConfigureAwait(false);
113114
shouldTerminate = true;
114115
}
115116
}

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ await ReceiveLoopAsync(mainLoopToken)
5959
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
6060
0, "End of stream",
6161
exception: eose);
62-
HandleMainLoopException(ea);
62+
await HandleMainLoopExceptionAsync(ea)
63+
.ConfigureAwait(false);
6364
}
6465
catch (HardProtocolException hpe)
6566
{
@@ -75,15 +76,29 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken)
7576
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
7677
Constants.InternalError, fileLoadException.Message,
7778
exception: fileLoadException);
78-
HandleMainLoopException(ea);
79+
await HandleMainLoopExceptionAsync(ea)
80+
.ConfigureAwait(false);
81+
}
82+
catch (OperationCanceledException ocex)
83+
{
84+
if (ocex.CancellationToken != mainLoopToken)
85+
{
86+
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
87+
Constants.InternalError,
88+
$"Unexpected Exception: {ocex.Message}",
89+
exception: ocex);
90+
await HandleMainLoopExceptionAsync(ea)
91+
.ConfigureAwait(false);
92+
}
7993
}
8094
catch (Exception ex)
8195
{
8296
var ea = new ShutdownEventArgs(ShutdownInitiator.Library,
8397
Constants.InternalError,
8498
$"Unexpected Exception: {ex.Message}",
8599
exception: ex);
86-
HandleMainLoopException(ea);
100+
await HandleMainLoopExceptionAsync(ea)
101+
.ConfigureAwait(false);
87102
}
88103

89104
using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout);
@@ -175,26 +190,29 @@ private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop =
175190
MaybeStopHeartbeatTimers();
176191
}
177192

178-
private void HandleMainLoopException(ShutdownEventArgs reason)
193+
private Task HandleMainLoopExceptionAsync(ShutdownEventArgs reason)
179194
{
180195
string message = reason.GetLogMessage();
181196
if (false == SetCloseReason(reason))
182197
{
183198
LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception);
184-
return;
199+
return Task.CompletedTask;
185200
}
186201

187202
_channel0.MaybeSetConnectionStartException(reason.Exception);
188203

189-
OnShutdown(reason);
190204
LogCloseError($"Unexpected connection closure: {message}", reason.Exception);
205+
206+
return OnShutdownAsync(reason);
191207
}
192208

193209
private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, CancellationToken cancellationToken)
194210
{
195211
if (SetCloseReason(hpe.ShutdownReason))
196212
{
197-
OnShutdown(hpe.ShutdownReason);
213+
await OnShutdownAsync(hpe.ShutdownReason)
214+
.ConfigureAwait(false);
215+
198216
await _session0.SetSessionClosingAsync(false)
199217
.ConfigureAwait(false);
200218
try

0 commit comments

Comments
 (0)