Skip to content

Commit 89affa2

Browse files
committed
* Move publisher confirmation setup / teardown out of BasicPublishAsync.
1 parent 9775cd4 commit 89affa2

File tree

2 files changed

+149
-61
lines changed

2 files changed

+149
-61
lines changed

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 136 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Buffers.Binary;
3334
using System.Collections.Concurrent;
3435
using System.Collections.Generic;
@@ -52,6 +53,70 @@ internal partial class Channel : IChannel, IRecoverable
5253
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
5354
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
5455

56+
private class PublisherConfirmationInfo
57+
{
58+
private ulong _publishSequenceNumber;
59+
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
60+
61+
internal PublisherConfirmationInfo()
62+
{
63+
_publishSequenceNumber = 0;
64+
_publisherConfirmationTcs = null;
65+
}
66+
67+
internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource<bool>? publisherConfirmationTcs)
68+
{
69+
_publishSequenceNumber = publishSequenceNumber;
70+
_publisherConfirmationTcs = publisherConfirmationTcs;
71+
}
72+
73+
internal ulong PublishSequenceNumber => _publishSequenceNumber;
74+
75+
internal TaskCompletionSource<bool>? PublisherConfirmationTcs => _publisherConfirmationTcs;
76+
77+
internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
78+
{
79+
if (_publisherConfirmationTcs is not null)
80+
{
81+
await _publisherConfirmationTcs.Task.WaitAsync(cancellationToken)
82+
.ConfigureAwait(false);
83+
}
84+
}
85+
86+
internal bool MaybeHandleException(Exception ex)
87+
{
88+
bool exceptionWasHandled = false;
89+
90+
if (_publisherConfirmationTcs is not null)
91+
{
92+
_publisherConfirmationTcs.SetException(ex);
93+
exceptionWasHandled = true;
94+
}
95+
96+
return exceptionWasHandled;
97+
}
98+
}
99+
100+
public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
101+
{
102+
if (_publisherConfirmationsEnabled)
103+
{
104+
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
105+
try
106+
{
107+
return _nextPublishSeqNo;
108+
}
109+
finally
110+
{
111+
_confirmSemaphore.Release();
112+
}
113+
}
114+
else
115+
{
116+
return _nextPublishSeqNo;
117+
}
118+
}
119+
55120
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled, bool publisherConfirmationTrackingEnabled)
56121
{
57122
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
@@ -98,10 +163,17 @@ await ModelSendAsync(in method, k.CancellationToken)
98163
}
99164
}
100165

166+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
167+
private bool ShouldHandleAckOrNack(ulong deliveryTag)
168+
{
169+
return _publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled &&
170+
deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty;
171+
}
172+
101173
[MethodImpl(MethodImplOptions.AggressiveInlining)]
102174
private void HandleAck(ulong deliveryTag, bool multiple)
103175
{
104-
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
176+
if (ShouldHandleAckOrNack(deliveryTag))
105177
{
106178
if (multiple)
107179
{
@@ -127,7 +199,7 @@ private void HandleAck(ulong deliveryTag, bool multiple)
127199
[MethodImpl(MethodImplOptions.AggressiveInlining)]
128200
private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
129201
{
130-
if (_publisherConfirmationsEnabled && _publisherConfirmationTrackingEnabled && deliveryTag > 0 && !_confirmsTaskCompletionSources.IsEmpty)
202+
if (ShouldHandleAckOrNack(deliveryTag))
131203
{
132204
if (multiple)
133205
{
@@ -192,5 +264,67 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
192264
}
193265
}
194266
}
267+
268+
private async Task<PublisherConfirmationInfo?> MaybeStartPublisherConfirmationTracking(CancellationToken cancellationToken)
269+
{
270+
if (_publisherConfirmationsEnabled)
271+
{
272+
await _confirmSemaphore.WaitAsync(cancellationToken)
273+
.ConfigureAwait(false);
274+
275+
ulong publishSequenceNumber = _nextPublishSeqNo;
276+
277+
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
278+
if (_publisherConfirmationTrackingEnabled)
279+
{
280+
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
281+
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
282+
}
283+
284+
_nextPublishSeqNo++;
285+
286+
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
287+
}
288+
else
289+
{
290+
return null;
291+
}
292+
}
293+
294+
private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConfirmationInfo? publisherConfirmationInfo,
295+
Exception ex)
296+
{
297+
bool exceptionWasHandled = false;
298+
299+
if (_publisherConfirmationsEnabled &&
300+
publisherConfirmationInfo is not null)
301+
{
302+
_nextPublishSeqNo--;
303+
304+
if (_publisherConfirmationTrackingEnabled)
305+
{
306+
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
307+
}
308+
309+
exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex);
310+
}
311+
312+
return exceptionWasHandled;
313+
}
314+
315+
private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationInfo? publisherConfirmationInfo,
316+
CancellationToken cancellationToken)
317+
{
318+
if (_publisherConfirmationsEnabled)
319+
{
320+
_confirmSemaphore.Release();
321+
322+
if (publisherConfirmationInfo is not null)
323+
{
324+
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
325+
.ConfigureAwait(false);
326+
}
327+
}
328+
}
195329
}
196330
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 13 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -807,26 +807,6 @@ await Session.Connection.HandleConnectionUnblockedAsync(cancellationToken)
807807
return true;
808808
}
809809

810-
public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
811-
{
812-
if (_publisherConfirmationsEnabled)
813-
{
814-
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
815-
try
816-
{
817-
return _nextPublishSeqNo;
818-
}
819-
finally
820-
{
821-
_confirmSemaphore.Release();
822-
}
823-
}
824-
else
825-
{
826-
return _nextPublishSeqNo;
827-
}
828-
}
829-
830810
public virtual ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
831811
CancellationToken cancellationToken)
832812
{
@@ -969,26 +949,13 @@ public async ValueTask BasicPublishAsync<TProperties>(string exchange, string ro
969949
CancellationToken cancellationToken = default)
970950
where TProperties : IReadOnlyBasicProperties, IAmqpHeader
971951
{
972-
TaskCompletionSource<bool>? publisherConfirmationTcs = null;
973-
ulong publishSequenceNumber = 0;
952+
PublisherConfirmationInfo? publisherConfirmationInfo = null;
974953
try
975954
{
976-
if (_publisherConfirmationsEnabled)
977-
{
978-
await _confirmSemaphore.WaitAsync(cancellationToken)
955+
publisherConfirmationInfo =
956+
await MaybeStartPublisherConfirmationTracking(cancellationToken)
979957
.ConfigureAwait(false);
980958

981-
publishSequenceNumber = _nextPublishSeqNo;
982-
983-
if (_publisherConfirmationTrackingEnabled)
984-
{
985-
publisherConfirmationTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
986-
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
987-
}
988-
989-
_nextPublishSeqNo++;
990-
}
991-
992959
await EnforceFlowControlAsync(cancellationToken)
993960
.ConfigureAwait(false);
994961

@@ -998,6 +965,12 @@ await EnforceFlowControlAsync(cancellationToken)
998965
? RabbitMQActivitySource.Send(routingKey, exchange, body.Length)
999966
: default;
1000967

968+
ulong publishSequenceNumber = 0;
969+
if (publisherConfirmationInfo is not null)
970+
{
971+
publishSequenceNumber = publisherConfirmationInfo.PublishSequenceNumber;
972+
}
973+
1001974
BasicProperties? props = PopulateBasicPropertiesHeaders(basicProperties, sendActivity, publishSequenceNumber);
1002975
if (props is null)
1003976
{
@@ -1012,35 +985,16 @@ await ModelSendAsync(in cmd, in props, body, cancellationToken)
1012985
}
1013986
catch (Exception ex)
1014987
{
1015-
if (_publisherConfirmationsEnabled)
1016-
{
1017-
_nextPublishSeqNo--;
1018-
if (_publisherConfirmationTrackingEnabled)
1019-
{
1020-
_confirmsTaskCompletionSources.TryRemove(publishSequenceNumber, out _);
1021-
}
1022-
}
1023-
1024-
if (publisherConfirmationTcs is not null)
1025-
{
1026-
publisherConfirmationTcs.SetException(ex);
1027-
}
1028-
else
988+
bool exceptionWasHandled =
989+
MaybeHandleExceptionWithEnabledPublisherConfirmations(publisherConfirmationInfo, ex);
990+
if (!exceptionWasHandled)
1029991
{
1030992
throw;
1031993
}
1032994
}
1033995
finally
1034996
{
1035-
if (_publisherConfirmationsEnabled)
1036-
{
1037-
_confirmSemaphore.Release();
1038-
}
1039-
}
1040-
1041-
if (publisherConfirmationTcs is not null)
1042-
{
1043-
await publisherConfirmationTcs.Task.WaitAsync(cancellationToken)
997+
await MaybeEndPublisherConfirmationTracking(publisherConfirmationInfo, cancellationToken)
1044998
.ConfigureAwait(false);
1045999
}
10461000
}

0 commit comments

Comments
 (0)