Skip to content

Commit f397358

Browse files
committed
Optimizing the multiple-confirm logic, pulling it out into it's own task that's only triggered when needed.
1 parent 34fb4bc commit f397358

File tree

3 files changed

+69
-12
lines changed

3 files changed

+69
-12
lines changed

RabbitMQDotNetClient.sln

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@ EndProject
1515
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}"
1616
ProjectSection(SolutionItems) = preProject
1717
.editorconfig = .editorconfig
18+
docs\specs\amqp0-9-1.xml = docs\specs\amqp0-9-1.xml
1819
EndProjectSection
1920
EndProject
20-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "RabbitMQTest", "RabbitMQTest\RabbitMQTest.csproj", "{3CFAC019-8281-48AD-8925-16CE8EC3CE50}"
21+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQTest", "RabbitMQTest\RabbitMQTest.csproj", "{3CFAC019-8281-48AD-8925-16CE8EC3CE50}"
2122
EndProject
2223
Global
2324
GlobalSection(SolutionConfigurationPlatforms) = preSolution

RabbitMQTest/Program.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ class Program
1515
private static int itemsPerBatch = 500;
1616
static async Task Main(string[] args)
1717
{
18-
ThreadPool.SetMinThreads(16 * Environment.ProcessorCount, 16 * Environment.ProcessorCount);
18+
Console.ReadLine();
1919
var connectionString = new Uri("amqp://guest:guest@localhost/");
2020

2121
var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
@@ -32,9 +32,9 @@ static async Task Main(string[] args)
3232
var asyncListener = new AsyncEventingBasicConsumer(subscriber);
3333
asyncListener.Received += AsyncListener_Received;
3434
subscriber.QueueBind("testqueue", "test", "myawesome.routing.key");
35-
subscriber.BasicConsume("testqueue", true, "testconsumer", asyncListener);
35+
subscriber.BasicConsume("testqueue", false, "testconsumer", asyncListener);
3636

37-
byte[] payload = new byte[16384];
37+
byte[] payload = new byte[512];
3838
var batchPublish = Task.Run(async () =>
3939
{
4040
while (messagesSent < batchesToSend * itemsPerBatch)
@@ -78,6 +78,7 @@ static async Task Main(string[] args)
7878
});
7979

8080
await Task.WhenAll(sentTask, receivedTask);
81+
Console.ReadLine();
8182
}
8283

8384
private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
@@ -90,6 +91,7 @@ private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs
9091
// Moving to better synchronization constructs solves the issue, and using the ThreadPool
9192
// is standard practice as well to maximize core utilization and reduce overhead of Thread creation
9293
Interlocked.Increment(ref messagesReceived);
94+
(sender as AsyncDefaultBasicConsumer).Model.BasicAck(@event.DeliveryTag, true);
9395
return Task.CompletedTask;
9496
}
9597
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ public abstract class ModelBase : IFullModel, IRecoverable
7575

7676
//private readonly SynchronizedList<ulong> m_unconfirmedSet = new SynchronizedList<ulong>();
7777
private readonly ConcurrentDictionary<ulong, bool> m_unconfirmedSet = new ConcurrentDictionary<ulong, bool>();
78+
private ulong _highestDeliveryTag = 0;
79+
private Task _multipleConfirmCleanup;
80+
private SemaphoreSlim _multipleConfirmLock = new SemaphoreSlim(1, 1);
81+
private readonly CancellationTokenSource _connectionClosingCancellation = new CancellationTokenSource();
7882
private readonly SemaphoreSlim _outstandingSemaphore = new SemaphoreSlim(0);
7983

8084
private EventHandler<EventArgs> m_basicRecoverOk;
@@ -114,6 +118,7 @@ protected void Initialise(ISession session)
114118
Session = session;
115119
Session.CommandReceived = HandleCommand;
116120
Session.SessionShutdown += OnSessionShutdown;
121+
_multipleConfirmCleanup = Task.Run(CleanupUnconfirmedTagsAsync);
117122
}
118123

119124
public TimeSpan HandshakeContinuationTimeout
@@ -268,6 +273,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
268273
{
269274
var k = new ShutdownContinuation();
270275
ModelShutdown += k.OnConnectionShutdown;
276+
_connectionClosingCancellation.Cancel();
271277

272278
try
273279
{
@@ -278,6 +284,7 @@ public void Close(ShutdownEventArgs reason, bool abort)
278284
}
279285
k.Wait(TimeSpan.FromMilliseconds(10000));
280286
ConsumerDispatcher.Shutdown(this);
287+
_multipleConfirmCleanup.Wait();
281288
}
282289
catch (AlreadyClosedException)
283290
{
@@ -422,6 +429,13 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b
422429
}
423430
}
424431

432+
public async ValueTask<MethodBase> ModelRpcAsync(MethodBase method, ContentHeaderBase header, byte[] body)
433+
{
434+
var k = new AsyncRpcContinuation();
435+
TransmitAndEnqueue(new Command(method, header, body), k);
436+
return (await k.GetReplyAsync(this.ContinuationTimeout).ConfigureAwait(false)).Method;
437+
}
438+
425439
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
426440
{
427441
if (method.HasContent)
@@ -1574,23 +1588,63 @@ protected virtual void handleAckNack(ulong deliveryTag, bool multiple, bool isNa
15741588
{
15751589
if (multiple)
15761590
{
1577-
foreach (ulong key in m_unconfirmedSet.Keys)
1591+
if (_highestDeliveryTag < deliveryTag)
15781592
{
1579-
if (key <= deliveryTag)
1593+
// Multiple confirms, let's trigger a cleanup of any older deliveries.
1594+
_highestDeliveryTag = deliveryTag;
1595+
try
15801596
{
1581-
m_unconfirmedSet.TryRemove(key, out _);
1597+
_multipleConfirmLock.Release();
1598+
}
1599+
catch (SemaphoreFullException)
1600+
{
1601+
// Ignore if we are trying to release often.
15821602
}
15831603
}
15841604
}
1585-
else
1586-
{
1587-
m_unconfirmedSet.TryRemove(deliveryTag, out _);
1588-
}
1605+
1606+
m_unconfirmedSet.TryRemove(deliveryTag, out _);
15891607

15901608
m_onlyAcksReceived = m_onlyAcksReceived && !isNack;
1609+
TriggerAllOutstandingCompleted();
1610+
}
1611+
1612+
private void TriggerAllOutstandingCompleted()
1613+
{
15911614
if (m_unconfirmedSet.Count == 0)
15921615
{
1593-
_outstandingSemaphore.Release();
1616+
try
1617+
{
1618+
_outstandingSemaphore.Release();
1619+
}
1620+
catch (SemaphoreFullException)
1621+
{
1622+
// Swallow the semaphore full exception.
1623+
}
1624+
}
1625+
}
1626+
1627+
public async Task CleanupUnconfirmedTagsAsync()
1628+
{
1629+
while (!_connectionClosingCancellation.IsCancellationRequested)
1630+
{
1631+
try
1632+
{
1633+
await _multipleConfirmLock.WaitAsync(_connectionClosingCancellation.Token).ConfigureAwait(false);
1634+
foreach (ulong key in m_unconfirmedSet.Keys)
1635+
{
1636+
if (key <= _highestDeliveryTag)
1637+
{
1638+
m_unconfirmedSet.TryRemove(key, out _);
1639+
}
1640+
}
1641+
1642+
TriggerAllOutstandingCompleted();
1643+
}
1644+
catch (TaskCanceledException)
1645+
{
1646+
// Swallow the task cancel exception since the model is being closed.
1647+
}
15941648
}
15951649
}
15961650

0 commit comments

Comments
 (0)