Skip to content

Commit 197a22b

Browse files
committed
* Use SemaphoreSlim to block publishers when max outstanding is reached.
1 parent 9343e8d commit 197a22b

File tree

3 files changed

+20
-58
lines changed

3 files changed

+20
-58
lines changed

projects/RabbitMQ.Client/Impl/Channel.BasicPublish.cs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,6 @@ await MaybeStartPublisherConfirmationTracking(cancellationToken)
5959
await MaybeEnforceFlowControlAsync(cancellationToken)
6060
.ConfigureAwait(false);
6161

62-
await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken)
63-
.ConfigureAwait(false);
64-
6562
var cmd = new BasicPublish(exchange, routingKey, mandatory, default);
6663

6764
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners
@@ -117,9 +114,6 @@ await MaybeStartPublisherConfirmationTracking(cancellationToken)
117114
await MaybeEnforceFlowControlAsync(cancellationToken)
118115
.ConfigureAwait(false);
119116

120-
await MaybeEnforceOutstandingPublisherConfirmationsAsync(cancellationToken)
121-
.ConfigureAwait(false);
122-
123117
var cmd = new BasicPublishMemory(exchange.Bytes, routingKey.Bytes, mandatory, default);
124118

125119
using Activity? sendActivity = RabbitMQActivitySource.PublisherHasListeners

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 16 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,10 @@ internal partial class Channel : IChannel, IRecoverable
4848
private bool _publisherConfirmationsEnabled = false;
4949
private bool _publisherConfirmationTrackingEnabled = false;
5050
private ushort? _maxOutstandingPublisherConfirmations = null;
51+
private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
5152
private ulong _nextPublishSeqNo = 0;
5253
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
5354
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
54-
private readonly AsyncManualResetEvent _maxOutstandingPublisherConfirmsReached = new(true);
5555

5656
private class PublisherConfirmationInfo
5757
{
@@ -124,6 +124,13 @@ private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
124124
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
125125
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
126126
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
127+
128+
if (_maxOutstandingPublisherConfirmations is not null)
129+
{
130+
_maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
131+
(int)_maxOutstandingPublisherConfirmations,
132+
(int)_maxOutstandingPublisherConfirmations);
133+
}
127134
}
128135

129136
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
@@ -141,7 +148,6 @@ private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
141148
if (_publisherConfirmationTrackingEnabled)
142149
{
143150
_confirmsTaskCompletionSources.Clear();
144-
MaybeUnblockPublishers();
145151
}
146152
_nextPublishSeqNo = 1;
147153
}
@@ -187,15 +193,13 @@ private void HandleAck(ulong deliveryTag, bool multiple)
187193
{
188194
pair.Value.SetResult(true);
189195
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
190-
MaybeUnblockPublishers();
191196
}
192197
}
193198
}
194199
else
195200
{
196201
if (_confirmsTaskCompletionSources.TryRemove(deliveryTag, out TaskCompletionSource<bool>? tcs))
197202
{
198-
MaybeUnblockPublishers();
199203
tcs.SetResult(true);
200204
}
201205
}
@@ -215,15 +219,13 @@ private void HandleNack(ulong deliveryTag, bool multiple, bool isReturn)
215219
{
216220
pair.Value.SetException(new PublishException(pair.Key, isReturn));
217221
_confirmsTaskCompletionSources.Remove(pair.Key, out _);
218-
MaybeUnblockPublishers();
219222
}
220223
}
221224
}
222225
else
223226
{
224227
if (_confirmsTaskCompletionSources.Remove(deliveryTag, out TaskCompletionSource<bool>? tcs))
225228
{
226-
MaybeUnblockPublishers();
227229
tcs.SetException(new PublishException(deliveryTag, isReturn));
228230
}
229231
}
@@ -266,7 +268,6 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
266268
}
267269

268270
_confirmsTaskCompletionSources.Clear();
269-
MaybeUnblockPublishers();
270271
}
271272
}
272273
finally
@@ -291,7 +292,12 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
291292
{
292293
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
293294
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;
294-
MaybeBlockPublishers();
295+
}
296+
297+
if (_maxOutstandingConfirmationsSemaphore is not null)
298+
{
299+
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
300+
.ConfigureAwait(false);
295301
}
296302

297303
_nextPublishSeqNo++;
@@ -318,7 +324,6 @@ private bool MaybeHandleExceptionWithEnabledPublisherConfirmations(PublisherConf
318324
if (_publisherConfirmationTrackingEnabled)
319325
{
320326
_confirmsTaskCompletionSources.TryRemove(publisherConfirmationInfo.PublishSequenceNumber, out _);
321-
MaybeUnblockPublishers();
322327
}
323328

324329
exceptionWasHandled = publisherConfirmationInfo.MaybeHandleException(ex);
@@ -340,49 +345,10 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
340345
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
341346
.ConfigureAwait(false);
342347
}
343-
}
344-
}
345-
346-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
347-
private ValueTask MaybeEnforceOutstandingPublisherConfirmationsAsync(CancellationToken cancellationToken)
348-
{
349-
if (_publisherConfirmationTrackingEnabled)
350-
{
351-
if (_maxOutstandingPublisherConfirmsReached.IsSet)
352-
{
353-
return default;
354-
}
355-
else
356-
{
357-
return _maxOutstandingPublisherConfirmsReached.WaitAsync(cancellationToken);
358-
}
359-
}
360-
361-
return default;
362-
}
363-
364-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
365-
private void MaybeBlockPublishers()
366-
{
367-
if (_publisherConfirmationTrackingEnabled)
368-
{
369-
if (_maxOutstandingPublisherConfirmations is not null
370-
&& _confirmsTaskCompletionSources.Count >= _maxOutstandingPublisherConfirmations)
371-
{
372-
_maxOutstandingPublisherConfirmsReached.Reset();
373-
}
374-
}
375-
}
376348

377-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
378-
private void MaybeUnblockPublishers()
379-
{
380-
if (_publisherConfirmationTrackingEnabled)
381-
{
382-
if (_maxOutstandingPublisherConfirmations is not null
383-
&& _confirmsTaskCompletionSources.Count < _maxOutstandingPublisherConfirmations)
349+
if (_maxOutstandingConfirmationsSemaphore is not null)
384350
{
385-
_maxOutstandingPublisherConfirmsReached.Set();
351+
_maxOutstandingConfirmationsSemaphore.Release();
386352
}
387353
}
388354
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,8 @@ protected virtual void Dispose(bool disposing)
531531

532532
ConsumerDispatcher.Dispose();
533533
_rpcSemaphore.Dispose();
534-
_confirmSemaphore?.Dispose();
534+
_confirmSemaphore.Dispose();
535+
_maxOutstandingConfirmationsSemaphore?.Dispose();
535536
}
536537
}
537538

@@ -552,7 +553,8 @@ protected virtual async ValueTask DisposeAsyncCore()
552553

553554
ConsumerDispatcher.Dispose();
554555
_rpcSemaphore.Dispose();
555-
_confirmSemaphore?.Dispose();
556+
_confirmSemaphore.Dispose();
557+
_maxOutstandingConfirmationsSemaphore?.Dispose();
556558
}
557559

558560
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)

0 commit comments

Comments
 (0)