Skip to content

Spike: Leverage RateLimiting #1704

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion projects/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
-->
<PackageVersion Include="System.IO.Pipelines" Version="6.0.0" />
<PackageVersion Include="System.Net.Http" Version="4.3.4" />
<PackageVersion Include="System.Threading.RateLimiting" Version="7.0.1" />
<PackageVersion Include="WireMock.Net" Version="1.5.62" />
<PackageVersion Include="xunit" Version="2.9.0" />
<PackageVersion Include="xunit.abstractions" Version="2.0.3" />
Expand All @@ -34,7 +35,7 @@
<PackageVersion Include="System.Threading.Channels" Version="6.0.0" />
<PackageVersion Include="System.Text.Json" Version="6.0.10" />
<PackageVersion Include="System.Net.Http.Json" Version="6.0.0" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="7.0.0" />
</ItemGroup>
<ItemGroup Condition="$(TargetFramework)=='net472'">
<PackageVersion Include="System.Text.Json" Version="6.0.10" />
Expand Down
45 changes: 37 additions & 8 deletions projects/RabbitMQ.Client/CreateChannelOptions.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,37 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
//
// The APL v2.0:
//
//---------------------------------------------------------------------------
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//---------------------------------------------------------------------------
//
// The MPL v2.0:
//
//---------------------------------------------------------------------------
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright (c) 2007-2024 Broadcom. All Rights Reserved.
//---------------------------------------------------------------------------

using System.Threading.RateLimiting;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
/// <summary>
Expand All @@ -16,15 +50,10 @@ public sealed class CreateChannelOptions
public bool PublisherConfirmationTrackingEnabled { get; set; } = false;

/// <summary>
/// If publisher confirmation tracking is enabled, this represents the number of allowed
/// outstanding publisher confirmations before publishing is blocked.
///
/// Defaults to <c>128</c>
///
/// Set to <c>null</c>, to allow an unlimited number of outstanding confirmations.
///
/// If the publisher confirmation tracking is enabled, this represents the rate limiter used to
/// throttle additional attempts to publish once the threshold is reached.
/// </summary>
public ushort? MaxOutstandingPublisherConfirmations { get; set; } = 128;
public RateLimiter? OutstandingPublisherConfirmationsRateLimiter { get; set; } = new ThrottlingRateLimiter(128);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably all this could be further improved by setting the default implementation when the tracking is enabled and when the tracking is disabled add a No-Op implementation. Then the code might become super straightforward in the publisher confirm implementation


/// <summary>
/// Set to a value greater than one to enable concurrent processing. For a concurrency greater than one <see cref="IAsyncBasicConsumer"/>
Expand Down
9 changes: 5 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
Expand All @@ -51,7 +52,7 @@ internal sealed class AutorecoveringChannel : IChannel, IRecoverable
private ushort _prefetchCountGlobal;
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private ushort? _maxOutstandingPublisherConfirmations = null;
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter = null;
private bool _usesTransactions;
private ushort _consumerDispatchConcurrency;

Expand All @@ -78,14 +79,14 @@ public AutorecoveringChannel(AutorecoveringConnection conn,
ushort consumerDispatchConcurrency,
bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations)
RateLimiter? outstandingPublisherConfirmationsRateLimiter)
{
_connection = conn;
_innerChannel = innerChannel;
_consumerDispatchConcurrency = consumerDispatchConcurrency;
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
}

public event AsyncEventHandler<BasicAckEventArgs> BasicAcksAsync
Expand Down Expand Up @@ -173,7 +174,7 @@ internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection con
RecoveryAwareChannel newChannel = await conn.CreateNonRecoveringChannelAsync(
_publisherConfirmationsEnabled,
_publisherConfirmationTrackingEnabled,
_maxOutstandingPublisherConfirmations,
_outstandingPublisherConfirmationsRateLimiter,
_consumerDispatchConcurrency,
cancellationToken)
.ConfigureAwait(false);
Expand Down
9 changes: 5 additions & 4 deletions projects/RabbitMQ.Client/Impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand Down Expand Up @@ -188,7 +189,7 @@ public event AsyncEventHandler<RecoveringConsumerEventArgs> RecoveringConsumerAs
public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
bool publisherConfirmationsEnabled = false,
bool publisherConfirmationTrackingEnabled = false,
ushort? maxOutstandingPublisherConfirmations = null,
RateLimiter? outstandingPublisherConfirmationsRateLimiter = null,
ushort? consumerDispatchConcurrency = null,
CancellationToken cancellationToken = default)
{
Expand All @@ -197,7 +198,7 @@ public async ValueTask<RecoveryAwareChannel> CreateNonRecoveringChannelAsync(
return (RecoveryAwareChannel)await result.OpenAsync(
publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
maxOutstandingPublisherConfirmations,
outstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
}
Expand Down Expand Up @@ -273,7 +274,7 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
RecoveryAwareChannel recoveryAwareChannel = await CreateNonRecoveringChannelAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations,
options.OutstandingPublisherConfirmationsRateLimiter,
cdc,
cancellationToken)
.ConfigureAwait(false);
Expand All @@ -284,7 +285,7 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
cdc,
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations);
options.OutstandingPublisherConfirmationsRateLimiter);
await RecordChannelAsync(autorecoveringChannel, channelsSemaphoreHeld: false, cancellationToken: cancellationToken)
.ConfigureAwait(false);
return autorecoveringChannel;
Expand Down
72 changes: 32 additions & 40 deletions projects/RabbitMQ.Client/Impl/Channel.PublisherConfirms.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
Expand All @@ -47,32 +48,31 @@ internal partial class Channel : IChannel, IRecoverable
{
private bool _publisherConfirmationsEnabled = false;
private bool _publisherConfirmationTrackingEnabled = false;
private ushort? _maxOutstandingPublisherConfirmations = null;
private SemaphoreSlim? _maxOutstandingConfirmationsSemaphore;
private ulong _nextPublishSeqNo = 0;
private readonly SemaphoreSlim _confirmSemaphore = new(1, 1);
private readonly ConcurrentDictionary<ulong, TaskCompletionSource<bool>> _confirmsTaskCompletionSources = new();
private RateLimiter? _outstandingPublisherConfirmationsRateLimiter;

private class PublisherConfirmationInfo
private sealed class PublisherConfirmationInfo : IDisposable
{
private ulong _publishSequenceNumber;
private TaskCompletionSource<bool>? _publisherConfirmationTcs;
private readonly IDisposable? _lease;

internal PublisherConfirmationInfo()
{
_publishSequenceNumber = 0;
PublishSequenceNumber = 0;
_publisherConfirmationTcs = null;
_lease = null;
}

internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource<bool>? publisherConfirmationTcs)
internal PublisherConfirmationInfo(ulong publishSequenceNumber, TaskCompletionSource<bool>? publisherConfirmationTcs, IDisposable? lease)
{
_publishSequenceNumber = publishSequenceNumber;
PublishSequenceNumber = publishSequenceNumber;
_publisherConfirmationTcs = publisherConfirmationTcs;
_lease = lease;
}

internal ulong PublishSequenceNumber => _publishSequenceNumber;

internal TaskCompletionSource<bool>? PublisherConfirmationTcs => _publisherConfirmationTcs;
internal ulong PublishSequenceNumber { get; }

internal async Task MaybeWaitForConfirmationAsync(CancellationToken cancellationToken)
{
Expand All @@ -95,6 +95,11 @@ internal bool MaybeHandleException(Exception ex)

return exceptionWasHandled;
}

public void Dispose()
{
_lease?.Dispose();
}
}

public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
Expand All @@ -119,18 +124,11 @@ public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToke

private void ConfigurePublisherConfirmations(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations)
RateLimiter? outstandingPublisherConfirmationsRateLimiter)
{
_publisherConfirmationsEnabled = publisherConfirmationsEnabled;
_publisherConfirmationTrackingEnabled = publisherConfirmationTrackingEnabled;
_maxOutstandingPublisherConfirmations = maxOutstandingPublisherConfirmations;

if (_publisherConfirmationTrackingEnabled && _maxOutstandingPublisherConfirmations is not null)
{
_maxOutstandingConfirmationsSemaphore = new SemaphoreSlim(
(int)_maxOutstandingPublisherConfirmations,
(int)_maxOutstandingPublisherConfirmations);
}
_outstandingPublisherConfirmationsRateLimiter = outstandingPublisherConfirmationsRateLimiter;
}

private async Task MaybeConfirmSelect(CancellationToken cancellationToken)
Expand Down Expand Up @@ -282,15 +280,14 @@ await _confirmSemaphore.WaitAsync(reason.CancellationToken)
{
if (_publisherConfirmationsEnabled)
{
RateLimitLease? lease = null;
if (_publisherConfirmationTrackingEnabled)
{
if (_maxOutstandingPublisherConfirmations is not null)
if (_outstandingPublisherConfirmationsRateLimiter is not null)
{
int percentOfMax = _confirmsTaskCompletionSources.Count / (int)_maxOutstandingPublisherConfirmations;
if (percentOfMax > 0.5)
{
await Task.Delay(1000 * percentOfMax).ConfigureAwait(false);
}
lease = await _outstandingPublisherConfirmationsRateLimiter.AcquireAsync(
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}
}

Expand All @@ -304,17 +301,11 @@ await _confirmSemaphore.WaitAsync(cancellationToken)
{
publisherConfirmationTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_confirmsTaskCompletionSources[publishSequenceNumber] = publisherConfirmationTcs;

if (_maxOutstandingConfirmationsSemaphore is not null)
{
await _maxOutstandingConfirmationsSemaphore.WaitAsync(cancellationToken)
.ConfigureAwait(false);
}
}

_nextPublishSeqNo++;

return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs);
return new PublisherConfirmationInfo(publishSequenceNumber, publisherConfirmationTcs, lease);
}
else
{
Expand Down Expand Up @@ -354,14 +345,15 @@ private async Task MaybeEndPublisherConfirmationTracking(PublisherConfirmationIn

if (publisherConfirmationInfo is not null)
{
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
.ConfigureAwait(false);
}

if (_publisherConfirmationTrackingEnabled &&
_maxOutstandingConfirmationsSemaphore is not null)
{
_maxOutstandingConfirmationsSemaphore.Release();
try
{
await publisherConfirmationInfo.MaybeWaitForConfirmationAsync(cancellationToken)
.ConfigureAwait(false);
}
finally
{
publisherConfirmationInfo.Dispose();
}
}
}
}
Expand Down
13 changes: 9 additions & 4 deletions projects/RabbitMQ.Client/Impl/Channel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.RateLimiting;
using System.Threading.Tasks;
using RabbitMQ.Client.ConsumerDispatching;
using RabbitMQ.Client.Events;
Expand Down Expand Up @@ -362,12 +363,12 @@ protected bool Enqueue(IRpcContinuation k)

internal async Task<IChannel> OpenAsync(bool publisherConfirmationsEnabled,
bool publisherConfirmationTrackingEnabled,
ushort? maxOutstandingPublisherConfirmations,
RateLimiter? outstandingPublisherConfirmationsRateLimiter,
CancellationToken cancellationToken)
{
ConfigurePublisherConfirmations(publisherConfirmationsEnabled,
publisherConfirmationTrackingEnabled,
maxOutstandingPublisherConfirmations);
outstandingPublisherConfirmationsRateLimiter);

bool enqueued = false;
var k = new ChannelOpenAsyncRpcContinuation(ContinuationTimeout, cancellationToken);
Expand Down Expand Up @@ -532,7 +533,7 @@ protected virtual void Dispose(bool disposing)
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_maxOutstandingConfirmationsSemaphore?.Dispose();
_outstandingPublisherConfirmationsRateLimiter?.Dispose();
}
}

Expand All @@ -554,7 +555,11 @@ protected virtual async ValueTask DisposeAsyncCore()
ConsumerDispatcher.Dispose();
_rpcSemaphore.Dispose();
_confirmSemaphore.Dispose();
_maxOutstandingConfirmationsSemaphore?.Dispose();
if(_outstandingPublisherConfirmationsRateLimiter is not null)
{
await _outstandingPublisherConfirmationsRateLimiter.DisposeAsync()
.ConfigureAwait(false);
}
}

public Task ConnectionTuneOkAsync(ushort channelMax, uint frameMax, ushort heartbeat, CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/Impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ public async Task<IChannel> CreateChannelAsync(CreateChannelOptions? options = d
IChannel ch = await channel.OpenAsync(
options.PublisherConfirmationsEnabled,
options.PublisherConfirmationTrackingEnabled,
options.MaxOutstandingPublisherConfirmations,
options.OutstandingPublisherConfirmationsRateLimiter,
cancellationToken)
.ConfigureAwait(false);
return ch;
Expand Down
Loading
Loading