Skip to content

Commit 32315b6

Browse files
committed
* Move ThrottlingRateLimiter to RabbitMQ.Client namespace.
* Add the outline of a test for throttling publishes based on outstanding confirms.
1 parent 9d6b0c5 commit 32315b6

File tree

4 files changed

+80
-12
lines changed

4 files changed

+80
-12
lines changed

projects/RabbitMQ.Client/PublicAPI.Unshipped.txt

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
const RabbitMQ.Client.Constants.PublishSequenceNumberHeader = "x-dotnet-pub-seq-no" -> string!
2-
const RabbitMQ.Client.Impl.ThrottlingRateLimiter.DefaultThrottlingPercentage = 50 -> int
3-
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.AcquireAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease!>
4-
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.AttemptAcquireCore(int permitCount) -> System.Threading.RateLimiting.RateLimitLease!
5-
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.Dispose(bool disposing) -> void
6-
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics?
7-
override RabbitMQ.Client.Impl.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan?
2+
const RabbitMQ.Client.ThrottlingRateLimiter.DefaultThrottlingPercentage = 50 -> int
3+
override RabbitMQ.Client.ThrottlingRateLimiter.AcquireAsyncCore(int permitCount, System.Threading.CancellationToken cancellationToken) -> System.Threading.Tasks.ValueTask<System.Threading.RateLimiting.RateLimitLease!>
4+
override RabbitMQ.Client.ThrottlingRateLimiter.AttemptAcquireCore(int permitCount) -> System.Threading.RateLimiting.RateLimitLease!
5+
override RabbitMQ.Client.ThrottlingRateLimiter.Dispose(bool disposing) -> void
6+
override RabbitMQ.Client.ThrottlingRateLimiter.GetStatistics() -> System.Threading.RateLimiting.RateLimiterStatistics?
7+
override RabbitMQ.Client.ThrottlingRateLimiter.IdleDuration.get -> System.TimeSpan?
88
RabbitMQ.Client.CreateChannelOptions
99
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.get -> ushort?
1010
RabbitMQ.Client.CreateChannelOptions.ConsumerDispatchConcurrency.set -> void
@@ -26,8 +26,8 @@ RabbitMQ.Client.Exceptions.RabbitMQClientException.RabbitMQClientException() ->
2626
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
2727
RabbitMQ.Client.IChannel.BasicPublishAsync<TProperties>(string! exchange, string! routingKey, bool mandatory, TProperties basicProperties, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
2828
RabbitMQ.Client.IConnection.CreateChannelAsync(RabbitMQ.Client.CreateChannelOptions? options = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task<RabbitMQ.Client.IChannel!>!
29-
RabbitMQ.Client.Impl.ThrottlingRateLimiter
30-
RabbitMQ.Client.Impl.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void
29+
RabbitMQ.Client.ThrottlingRateLimiter
30+
RabbitMQ.Client.ThrottlingRateLimiter.ThrottlingRateLimiter(int maxConcurrentCalls, int? throttlingPercentage = 50) -> void
3131
static RabbitMQ.Client.CreateChannelOptions.Default.get -> RabbitMQ.Client.CreateChannelOptions!
3232
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, bool mandatory, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask
3333
static RabbitMQ.Client.IChannelExtensions.BasicPublishAsync(this RabbitMQ.Client.IChannel! channel, RabbitMQ.Client.CachedString! exchange, RabbitMQ.Client.CachedString! routingKey, System.ReadOnlyMemory<byte> body, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask

projects/RabbitMQ.Client/Impl/ThrottlingRateLimiter.cs renamed to projects/RabbitMQ.Client/ThrottlingRateLimiter.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
using System.Threading.RateLimiting;
3535
using System.Threading.Tasks;
3636

37-
namespace RabbitMQ.Client.Impl
37+
namespace RabbitMQ.Client
3838
{
3939
public class ThrottlingRateLimiter : RateLimiter
4040
{

projects/Test/Integration/TestBasicPublish.cs

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,5 +322,73 @@ public async Task TestPropertiesRoundtrip_Headers()
322322
Assert.Equal(sendBody, consumeBody);
323323
Assert.Equal("World", response);
324324
}
325+
326+
[Fact]
327+
public async Task TestPublisherConfirmationThrottling()
328+
{
329+
const int MaxOutstandingConfirms = 4;
330+
331+
var channelOpts = new CreateChannelOptions
332+
{
333+
PublisherConfirmationsEnabled = true,
334+
PublisherConfirmationTrackingEnabled = true,
335+
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
336+
};
337+
338+
var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
339+
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
340+
Task publishTask = Task.Run(async () =>
341+
{
342+
ConnectionFactory cf = CreateConnectionFactory();
343+
344+
await using (IConnection conn = await cf.CreateConnectionAsync())
345+
{
346+
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
347+
{
348+
QueueDeclareOk q = await ch.QueueDeclareAsync();
349+
350+
channelCreatedTcs.SetResult(true);
351+
352+
int publishCount = 0;
353+
/*
354+
* Note: if batchSize equals MaxOutstandingConfirms,
355+
* a delay is added per-publish and this test takes much longer
356+
* to run. TODO figure out how the heck to test that
357+
*/
358+
int batchSize = MaxOutstandingConfirms / 2;
359+
try
360+
{
361+
while (publishCount < 128)
362+
{
363+
var publishBatch = new List<ValueTask>();
364+
for (int i = 0; i < batchSize; i++)
365+
{
366+
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
367+
}
368+
369+
foreach (ValueTask pt in publishBatch)
370+
{
371+
await pt;
372+
publishCount++;
373+
}
374+
375+
publishBatch.Clear();
376+
publishBatch = null;
377+
}
378+
379+
messagesPublishedTcs.SetResult(true);
380+
}
381+
catch (Exception ex)
382+
{
383+
messagesPublishedTcs.SetException(ex);
384+
}
385+
}
386+
}
387+
});
388+
389+
await channelCreatedTcs.Task.WaitAsync(WaitSpan);
390+
await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
391+
await publishTask.WaitAsync(WaitSpan);
392+
}
325393
}
326394
}

projects/Test/Integration/TestToxiproxy.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,9 @@ public async Task TestCloseConnection()
105105
}
106106

107107
/*
108-
* Note: using TrySetResult because this callback will be called when the
109-
* test exits, and connectionShutdownTcs will have already been set
110-
*/
108+
* Note: using TrySetResult because this callback will be called when the
109+
* test exits, and connectionShutdownTcs will have already been set
110+
*/
111111
connectionShutdownTcs.TrySetResult(true);
112112
return Task.CompletedTask;
113113
};

0 commit comments

Comments
 (0)