Skip to content

Commit ec61e45

Browse files
committed
Modify TestPublisherConfirmationThrottling to use toxiproxy.
1 parent e02734d commit ec61e45

File tree

2 files changed

+104
-68
lines changed

2 files changed

+104
-68
lines changed

projects/Test/Integration/TestBasicPublish.cs

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -322,73 +322,5 @@ public async Task TestPropertiesRoundtrip_Headers()
322322
Assert.Equal(sendBody, consumeBody);
323323
Assert.Equal("World", response);
324324
}
325-
326-
[Fact]
327-
public async Task TestPublisherConfirmationThrottling()
328-
{
329-
const int MaxOutstandingConfirms = 4;
330-
331-
var channelOpts = new CreateChannelOptions
332-
{
333-
PublisherConfirmationsEnabled = true,
334-
PublisherConfirmationTrackingEnabled = true,
335-
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
336-
};
337-
338-
var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
339-
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
340-
Task publishTask = Task.Run(async () =>
341-
{
342-
ConnectionFactory cf = CreateConnectionFactory();
343-
344-
await using (IConnection conn = await cf.CreateConnectionAsync())
345-
{
346-
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
347-
{
348-
QueueDeclareOk q = await ch.QueueDeclareAsync();
349-
350-
channelCreatedTcs.SetResult(true);
351-
352-
int publishCount = 0;
353-
/*
354-
* Note: if batchSize equals MaxOutstandingConfirms,
355-
* a delay is added per-publish and this test takes much longer
356-
* to run. TODO figure out how the heck to test that
357-
*/
358-
int batchSize = MaxOutstandingConfirms / 2;
359-
try
360-
{
361-
while (publishCount < 128)
362-
{
363-
var publishBatch = new List<ValueTask>();
364-
for (int i = 0; i < batchSize; i++)
365-
{
366-
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
367-
}
368-
369-
foreach (ValueTask pt in publishBatch)
370-
{
371-
await pt;
372-
publishCount++;
373-
}
374-
375-
publishBatch.Clear();
376-
publishBatch = null;
377-
}
378-
379-
messagesPublishedTcs.SetResult(true);
380-
}
381-
catch (Exception ex)
382-
{
383-
messagesPublishedTcs.SetException(ex);
384-
}
385-
}
386-
}
387-
});
388-
389-
await channelCreatedTcs.Task.WaitAsync(WaitSpan);
390-
await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
391-
await publishTask.WaitAsync(WaitSpan);
392-
}
393325
}
394326
}

projects/Test/Integration/TestToxiproxy.cs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Collections.Generic;
3334
using System.Net;
35+
using System.Threading;
3436
using System.Threading.Tasks;
3537
using Integration;
3638
using RabbitMQ.Client;
@@ -284,6 +286,108 @@ public async Task TestTcpReset_GH1464()
284286
await recoveryTask;
285287
}
286288

289+
[SkippableFact]
290+
[Trait("Category", "Toxiproxy")]
291+
public async Task TestPublisherConfirmationThrottling()
292+
{
293+
Skip.IfNot(AreToxiproxyTestsEnabled, "RABBITMQ_TOXIPROXY_TESTS is not set, skipping test");
294+
295+
const int TotalMessageCount = 64;
296+
const int MaxOutstandingConfirms = 8;
297+
const int BatchSize = MaxOutstandingConfirms * 2;
298+
299+
using var pm = new ToxiproxyManager(_testDisplayName, IsRunningInCI, IsWindows);
300+
await pm.InitializeAsync();
301+
302+
ConnectionFactory cf = CreateConnectionFactory();
303+
cf.Endpoint = new AmqpTcpEndpoint(IPAddress.Loopback.ToString(), pm.ProxyPort);
304+
cf.RequestedHeartbeat = TimeSpan.FromSeconds(5);
305+
cf.AutomaticRecoveryEnabled = true;
306+
307+
var channelOpts = new CreateChannelOptions
308+
{
309+
PublisherConfirmationsEnabled = true,
310+
PublisherConfirmationTrackingEnabled = true,
311+
OutstandingPublisherConfirmationsRateLimiter = new ThrottlingRateLimiter(MaxOutstandingConfirms)
312+
};
313+
314+
var channelCreatedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
315+
var messagesPublishedTcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
316+
long publishCount = 0;
317+
Task publishTask = Task.Run(async () =>
318+
{
319+
await using (IConnection conn = await cf.CreateConnectionAsync())
320+
{
321+
await using (IChannel ch = await conn.CreateChannelAsync(channelOpts))
322+
{
323+
QueueDeclareOk q = await ch.QueueDeclareAsync();
324+
325+
channelCreatedTcs.SetResult(true);
326+
327+
try
328+
{
329+
var publishBatch = new List<ValueTask>();
330+
while (publishCount < TotalMessageCount)
331+
{
332+
for (int i = 0; i < BatchSize; i++)
333+
{
334+
publishBatch.Add(ch.BasicPublishAsync("", q.QueueName, GetRandomBody()));
335+
}
336+
337+
foreach (ValueTask pt in publishBatch)
338+
{
339+
await pt;
340+
Interlocked.Increment(ref publishCount);
341+
}
342+
343+
publishBatch.Clear();
344+
}
345+
346+
messagesPublishedTcs.SetResult(true);
347+
}
348+
catch (Exception ex)
349+
{
350+
messagesPublishedTcs.SetException(ex);
351+
}
352+
}
353+
}
354+
});
355+
356+
await channelCreatedTcs.Task;
357+
358+
const string toxicName = "rmq-localhost-bandwidth";
359+
var bandwidthToxic = new BandwidthToxic();
360+
bandwidthToxic.Name = toxicName;
361+
bandwidthToxic.Attributes.Rate = 0;
362+
bandwidthToxic.Toxicity = 1.0;
363+
bandwidthToxic.Stream = ToxicDirection.DownStream;
364+
365+
await Task.Delay(TimeSpan.FromSeconds(1));
366+
367+
Task<BandwidthToxic> addToxicTask = pm.AddToxicAsync(bandwidthToxic);
368+
369+
while (true)
370+
{
371+
long publishCount0 = Interlocked.Read(ref publishCount);
372+
await Task.Delay(TimeSpan.FromSeconds(5));
373+
long publishCount1 = Interlocked.Read(ref publishCount);
374+
375+
if (publishCount0 == publishCount1)
376+
{
377+
// Publishing has "settled" due to being blocked
378+
break;
379+
}
380+
}
381+
382+
await addToxicTask.WaitAsync(WaitSpan);
383+
await pm.RemoveToxicAsync(toxicName).WaitAsync(WaitSpan);
384+
385+
await messagesPublishedTcs.Task.WaitAsync(WaitSpan);
386+
await publishTask.WaitAsync(WaitSpan);
387+
388+
Assert.Equal(TotalMessageCount, publishCount);
389+
}
390+
287391
private bool AreToxiproxyTestsEnabled
288392
{
289393
get

0 commit comments

Comments
 (0)