Skip to content

Commit f8a18cd

Browse files
danielmarbachlukebakken
authored andcommitted
Leverage System.Threading.RateLimiting
* Cosmetic formatting * Use `RateLimitLease` instead of `IDisposable` in internal classes. * Move `ThrottlingRateLimiter` to `RabbitMQ.Client` namespace. * Add the outline of a test for throttling publishes based on outstanding confirms. * Do not error exit on Windows when `inet_error` is found in logs * Modify `TestPublisherConfirmationThrottling` to use toxiproxy.
1 parent b751d92 commit f8a18cd

13 files changed

+332
-66
lines changed

.ci/windows/gha-log-check.ps1

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ Write-Host "[INFO] looking for errors in '$rabbitmq_log_dir'"
88

99
If (Get-ChildItem $rabbitmq_log_dir\*.log | Select-String -Quiet -SimpleMatch -Pattern inet_error)
1010
{
11-
Write-Error "[WARNING] found inet_error in '$rabbitmq_log_dir'"
11+
Write-Host -ForegroundColor Yellow "[WARNING] found inet_error in '$rabbitmq_log_dir'"
1212
exit 0
1313
}

projects/Directory.Packages.props

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
-->
1717
<PackageVersion Include="System.IO.Pipelines" Version="8.0.0" />
1818
<PackageVersion Include="System.Net.Http" Version="4.3.4" />
19+
<PackageVersion Include="System.Threading.RateLimiting" Version="8.0.0" />
1920
<PackageVersion Include="WireMock.Net" Version="1.5.62" />
2021
<PackageVersion Include="xunit" Version="2.9.0" />
2122
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />

projects/RabbitMQ.Client/CreateChannelOptions.cs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,37 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 2.0.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
8+
//
9+
// Licensed under the Apache License, Version 2.0 (the "License");
10+
// you may not use this file except in compliance with the License.
11+
// You may obtain a copy of the License at
12+
//
13+
// https://www.apache.org/licenses/LICENSE-2.0
14+
//
15+
// Unless required by applicable law or agreed to in writing, software
16+
// distributed under the License is distributed on an "AS IS" BASIS,
17+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18+
// See the License for the specific language governing permissions and
19+
// limitations under the License.
20+
//---------------------------------------------------------------------------
21+
//
22+
// The MPL v2.0:
23+
//
24+
//---------------------------------------------------------------------------
25+
// This Source Code Form is subject to the terms of the Mozilla Public
26+
// License, v. 2.0. If a copy of the MPL was not distributed with this
27+
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
28+
//
29+
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
30+
//---------------------------------------------------------------------------
31+
32+
using System.Threading.RateLimiting;
33+
using RabbitMQ.Client.Impl;
34+
135
namespace RabbitMQ.Client
236
{
337
/// <summary>
@@ -16,15 +50,10 @@ public sealed class CreateChannelOptions
1650
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;
1751

1852
/// <summary>
19-
/// If publisher confirmation tracking is enabled, this represents the number of allowed
20-
/// outstanding publisher confirmations before publishing is blocked.
21-
///
22-
/// Defaults to <c>128</c>
23-
///
24-
/// Set to <c>null</c>, to allow an unlimited number of outstanding confirmations.
25-
///
53+
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
54+
/// throttle additional attempts to publish once the threshold is reached.
2655
/// </summary>
27-
public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;
56+
public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
2857

2958
/// <summary>
3059
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>

projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
using System.Collections.Generic;
3434
using System.Runtime.CompilerServices;
3535
using System.Threading;
36+
using System.Threading.RateLimiting;
3637
using System.Threading.Tasks;
3738
using RabbitMQ.Client.ConsumerDispatching;
3839
using RabbitMQ.Client.Events;
@@ -51,7 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
5152
private ushort _prefetchCountGlobal;
5253
private bool _publisherConfirmationsEnabled = false;
5354
private bool _publisherConfirmationTrackingEnabled = false;
54-
private ushort? _maxOutstandingPublisherConfirmations = null;
55+
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null;
5556
private bool _usesTransactions;
5657
private ushort _consumerDispatchConcurrency;
5758

@@ -78,14 +79,14 @@ public AutorecoveringChannel(AutorecoveringConnection conn,
7879
ushort consumerDispatchConcurrency,
7980
bool publisherConfirmationsEnabled,
8081
bool publisherConfirmationTrackingEnabled,
81-
ushort? maxOutstandingPublisherConfirmations)
82+
RateLimiter? outstandingPublisherConfirmationsRateLimiter)
8283
{
8384
_connection = conn;
8485
_innerChannel = innerChannel;
8586
_consumerDispatchConcurrency = consumerDispatchConcurrency;
8687
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
8788
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
88-
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
89+
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
8990
}
9091

9192
public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
@@ -173,7 +174,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
173174
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
174175
_publisherConfirmationsEnabled,
175176
_publisherConfirmationTrackingEnabled,
176-
_maxOutstandingPublisherConfirmations,
177+
_outstandingPublisherConfirmationsRateLimiter,
177178
_consumerDispatchConcurrency,
178179
cancellationToken)
179180
.ConfigureAwait(false);

projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
using System.Collections.Generic;
3434
using System.Runtime.CompilerServices;
3535
using System.Threading;
36+
using System.Threading.RateLimiting;
3637
using System.Threading.Tasks;
3738
using RabbitMQ.Client.Events;
3839
using RabbitMQ.Client.Exceptions;
@@ -188,7 +189,7 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
188189
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
189190
bool publisherConfirmationsEnabled = false,
190191
bool publisherConfirmationTrackingEnabled = false,
191-
ushort? maxOutstandingPublisherConfirmations = null,
192+
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
192193
ushort? consumerDispatchConcurrency = null,
193194
CancellationToken cancellationToken = default)
194195
{
@@ -197,7 +198,7 @@ public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
197198
return (RecoveryAwareChannel)await result.OpenAsync(
198199
publisherConfirmationsEnabled,
199200
publisherConfirmationTrackingEnabled,
200-
maxOutstandingPublisherConfirmations,
201+
outstandingPublisherConfirmationsRateLimiter,
201202
cancellationToken)
202203
.ConfigureAwait(false);
203204
}
@@ -273,7 +274,7 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
273274
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
274275
options.PublisherConfirmationsEnabled,
275276
options.PublisherConfirmationTrackingEnabled,
276-
options.MaxOutstandingPublisherConfirmations,
277+
options.OutstandingPublisherConfirmationsRateLimiter,
277278
cdc,
278279
cancellationToken)
279280
.ConfigureAwait(false);
@@ -284,7 +285,7 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
284285
cdc,
285286
options.PublisherConfirmationsEnabled,
286287
options.PublisherConfirmationTrackingEnabled,
287-
options.MaxOutstandingPublisherConfirmations);
288+
options.OutstandingPublisherConfirmationsRateLimiter);
288289
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
289290
.ConfigureAwait(false);
290291
return autorecoveringChannel;

projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
using System.Diagnostics;
3636
using System.Runtime.CompilerServices;
3737
using System.Threading;
38+
using System.Threading.RateLimiting;
3839
using System.Threading.Tasks;
3940
using RabbitMQ.Client.Events;
4041
using RabbitMQ.Client.Exceptions;
@@ -47,32 +48,26 @@ internal partial class Channel : IChannel, IRecoverable
4748
{
4849
private bool _publisherConfirmationsEnabled = false;
4950
private bool _publisherConfirmationTrackingEnabled = false;
50-
private ushort? _maxOutstandingPublisherConfirmations = null;
51-
private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
5251
private ulong _nextPublishSeqNo = 0;
5352
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
5453
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
54+
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;
5555

56-
private class PublisherConfirmationInfo
56+
private sealed class PublisherConfirmationInfo : IDisposable
5757
{
58-
private ulong _publishSequenceNumber;
5958
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
59+
private readonly RateLimitLease? _lease;
6060

61-
internal PublisherConfirmationInfo()
61+
internal PublisherConfirmationInfo(ulong publishSequenceNumber,
62+
TaskCompletionSource<bool>? publisherConfirmationTcs,
63+
RateLimitLease? lease)
6264
{
63-
_publishSequenceNumber = 0;
64-
_publisherConfirmationTcs = null;
65-
}
66-
67-
internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource<bool>? publisherConfirmationTcs)
68-
{
69-
_publishSequenceNumber = publishSequenceNumber;
65+
PublishSequenceNumber = publishSequenceNumber;
7066
_publisherConfirmationTcs = publisherConfirmationTcs;
67+
_lease = lease;
7168
}
7269

73-
internal ulong PublishSequenceNumber => _publishSequenceNumber;
74-
75-
internal TaskCompletionSource<bool>? PublisherConfirmationTcs => _publisherConfirmationTcs;
70+
internal ulong PublishSequenceNumber { get; }
7671

7772
internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
7873
{
@@ -95,6 +90,11 @@ internal bool MaybeHandleException(Exception ex)
9590

9691
return exceptionWasHandled;
9792
}
93+
94+
public void Dispose()
95+
{
96+
_lease?.Dispose();
97+
}
9898
}
9999

100100
public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
@@ -119,18 +119,11 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke
119119

120120
private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
121121
bool publisherConfirmationTrackingEnabled,
122-
ushort? maxOutstandingPublisherConfirmations)
122+
RateLimiter? outstandingPublisherConfirmationsRateLimiter)
123123
{
124124
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
125125
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
126-
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
127-
128-
if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
129-
{
130-
_maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
131-
(int)_maxOutstandingPublisherConfirmations,
132-
(int)_maxOutstandingPublisherConfirmations);
133-
}
126+
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
134127
}
135128

136129
private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
@@ -282,11 +275,15 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
282275
{
283276
if (_publisherConfirmationsEnabled)
284277
{
285-
if (_publisherConfirmationTrackingEnabled &&
286-
_maxOutstandingConfirmationsSemaphore is not null)
278+
RateLimitLease? lease = null;
279+
if (_publisherConfirmationTrackingEnabled)
287280
{
288-
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
289-
.ConfigureAwait(false);
281+
if (_outstandingPublisherConfirmationsRateLimiter is not null)
282+
{
283+
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
284+
cancellationToken: cancellationToken)
285+
.ConfigureAwait(false);
286+
}
290287
}
291288

292289
await _confirmSemaphore.WaitAsync(cancellationToken)
@@ -303,7 +300,7 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
303300

304301
_nextPublishSeqNo++;
305302

306-
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
303+
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
307304
}
308305
else
309306
{
@@ -339,18 +336,19 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn
339336
{
340337
if (_publisherConfirmationsEnabled)
341338
{
342-
if (_publisherConfirmationTrackingEnabled &&
343-
_maxOutstandingConfirmationsSemaphore is not null)
344-
{
345-
_maxOutstandingConfirmationsSemaphore.Release();
346-
}
347-
348339
_confirmSemaphore.Release();
349340

350341
if (publisherConfirmationInfo is not null)
351342
{
352-
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
353-
.ConfigureAwait(false);
343+
try
344+
{
345+
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
346+
.ConfigureAwait(false);
347+
}
348+
finally
349+
{
350+
publisherConfirmationInfo.Dispose();
351+
}
354352
}
355353
}
356354
}

projects/RabbitMQ.Client/Impl/Channel.cs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
using System.Runtime.CompilerServices;
3838
using System.Text;
3939
using System.Threading;
40+
using System.Threading.RateLimiting;
4041
using System.Threading.Tasks;
4142
using RabbitMQ.Client.ConsumerDispatching;
4243
using RabbitMQ.Client.Events;
@@ -362,12 +363,12 @@ protected bool Enqueue(IRpcContinuation k)
362363

363364
internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
364365
bool publisherConfirmationTrackingEnabled,
365-
ushort? maxOutstandingPublisherConfirmations,
366+
RateLimiter? outstandingPublisherConfirmationsRateLimiter,
366367
CancellationToken cancellationToken)
367368
{
368369
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
369370
publisherConfirmationTrackingEnabled,
370-
maxOutstandingPublisherConfirmations);
371+
outstandingPublisherConfirmationsRateLimiter);
371372

372373
bool enqueued = false;
373374
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
@@ -532,7 +533,7 @@ protected virtual void Dispose(bool disposing)
532533
ConsumerDispatcher.Dispose();
533534
_rpcSemaphore.Dispose();
534535
_confirmSemaphore.Dispose();
535-
_maxOutstandingConfirmationsSemaphore?.Dispose();
536+
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
536537
}
537538
}
538539

@@ -554,7 +555,11 @@ protected virtual async ValueTask DisposeAsyncCore()
554555
ConsumerDispatcher.Dispose();
555556
_rpcSemaphore.Dispose();
556557
_confirmSemaphore.Dispose();
557-
_maxOutstandingConfirmationsSemaphore?.Dispose();
558+
if (_outstandingPublisherConfirmationsRateLimiter is not null)
559+
{
560+
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
561+
.ConfigureAwait(false);
562+
}
558563
}
559564

560565
public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)

projects/RabbitMQ.Client/Impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
276276
IChannel ch = await channel.OpenAsync(
277277
options.PublisherConfirmationsEnabled,
278278
options.PublisherConfirmationTrackingEnabled,
279-
options.MaxOutstandingPublisherConfirmations,
279+
options.OutstandingPublisherConfirmationsRateLimiter,
280280
cancellationToken)
281281
.ConfigureAwait(false);
282282
return ch;

0 commit comments

Comments
 (0)