Skip to content

Task optimizations and deadlock fix #687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29519.87
VisualStudioVersion = 16.0.29709.97
MinimumVisualStudioVersion = 10.0.40219.1
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "projects", "projects", "{068D7DC3-8E6E-4951-B9E3-272C641BF0DE}"
EndProject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,12 @@ public void Quiesce()

public void Shutdown()
{
// necessary evil
this.workService.Stop().GetAwaiter().GetResult();
workService.Stop();
}

public void Shutdown(IModel model)
{
// necessary evil
this.workService.Stop(model).GetAwaiter().GetResult();
workService.Stop(model);
}

public bool IsShutdown
Expand Down Expand Up @@ -66,10 +64,10 @@ public void HandleBasicCancel(IBasicConsumer consumer, string consumerTag)
public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reason)
{
// the only case where we ignore the shutdown flag.
new ModelShutdown(consumer,reason).Execute(model).GetAwaiter().GetResult();
new ModelShutdown(consumer, reason).Execute(model).GetAwaiter().GetResult();
}

private void ScheduleUnlessShuttingDown<TWork>(TWork work)
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
where TWork : Work
{
if (!this.IsShutdown)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,69 +1,55 @@
using System;
using RabbitMQ.Client.Impl;

using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Impl;

namespace RabbitMQ.Client
{
internal sealed class AsyncConsumerWorkService : ConsumerWorkService
{
readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();
private readonly ConcurrentDictionary<IModel, WorkPool> workPools = new ConcurrentDictionary<IModel, WorkPool>();

public void Schedule<TWork>(ModelBase model, TWork work)
where TWork : Work
public void Schedule<TWork>(ModelBase model, TWork work) where TWork : Work
{
// two step approach is taken, as TryGetValue does not aquire locks
// if this fails, GetOrAdd is called, which takes a lock

WorkPool workPool;
if (workPools.TryGetValue(model, out workPool) == false)
{
var newWorkPool = new WorkPool(model);
workPool = workPools.GetOrAdd(model, newWorkPool);

// start if it's only the workpool that has been just created
if (newWorkPool == workPool)
{
newWorkPool.Start();
}
}
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all performance sensitive code, so it's much more readable like this, but will revert if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable.

}

workPool.Enqueue(work);
private WorkPool StartNewWorkPool(IModel model)
{
var newWorkPool = new WorkPool(model as ModelBase);
newWorkPool.Start();
return newWorkPool;
}

public async Task Stop(IModel model)
public void Stop(IModel model)
{
WorkPool workPool;
if (workPools.TryRemove(model, out workPool))
if (workPools.TryRemove(model, out WorkPool workPool))
{
await workPool.Stop().ConfigureAwait(false);
workPool.Stop();
}
}

public async Task Stop()
public void Stop()
{
foreach (var model in workPools.Keys)
foreach (IModel model in workPools.Keys)
{
await Stop(model).ConfigureAwait(false);
Stop(model);
}
}

class WorkPool
{
readonly ConcurrentQueue<Work> workQueue;
readonly TimeSpan waitTime;
readonly CancellationTokenSource tokenSource;
readonly ModelBase model;
TaskCompletionSource<bool> messageArrived;
readonly SemaphoreSlim semaphore = new SemaphoreSlim(0);
private Task task;

public WorkPool(ModelBase model)
{
this.model = model;
workQueue = new ConcurrentQueue<Work>();
messageArrived = new TaskCompletionSource<bool>();
waitTime = TimeSpan.FromMilliseconds(100);
tokenSource = new CancellationTokenSource();
}

Expand All @@ -75,30 +61,33 @@ public void Start()
public void Enqueue(Work work)
{
workQueue.Enqueue(work);
messageArrived.TrySetResult(true);
semaphore.Release();
}

async Task Loop()
{
while (tokenSource.IsCancellationRequested == false)
{
Work work;
while (workQueue.TryDequeue(out work))
try
{
await work.Execute(model).ConfigureAwait(false);
await semaphore.WaitAsync(tokenSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException)
{
// Swallowing the task cancellation in case we are stopping work.
}

await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
messageArrived.TrySetResult(true);
messageArrived = new TaskCompletionSource<bool>();
Comment on lines -91 to -93
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that Enqueue and Loop are working on the same messageArrived instances here. This is a lot easier to solve with SemaphoreSlim (Enqueue calls Release and increments it, releasing the await in Loop if it's at 0), and gets rid of the 100ms polling Task.Delay. Much cleaner and more efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I used a TCS here is because it is much faster than the semaphore version. The reason is that releasing the semaphore on the hot path vs TrySetReslut on the TCS is up to 7 to 8 times slower on NETFRAMEWORK and up to 3-4 times slower on NETCORE 3.1

Netframework without contention

Method Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
TaskCompletionSource 1.874 ns 0.0431 ns 0.0382 ns 1.00 0.00 - - - -
SemaphoreSlim 15.351 ns 0.0410 ns 0.0342 ns 8.21 0.18 - - - -

Netframework with contention

7

Method Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
TaskCompletionSource 2.137 ns 0.0650 ns 0.0608 ns 1.00 0.00 - - - -
SemaphoreSlim 15.211 ns 0.2943 ns 0.2753 ns 7.13 0.29 - - - -

Netcore 3.1 with contention

Method Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
TaskCompletionSource 4.221 ns 0.1009 ns 0.0944 ns 1.00 0.00 - - - -
SemaphoreSlim 13.062 ns 0.1891 ns 0.1769 ns 3.10 0.07 - - - -

Do you want me to use the TCS again and send a PR to fix this @michaelklishin @lukebakken ?

See danielmarbach/MicroBenchmarks@516eac9

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please submit a PR. Thanks for the benchmarks. I assume that the original purpose of #687 (fixing a potential deadlock) will still apply.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is @stebet 's comment:

It also replaces AutoResetEvent and TaskCompletionSource objects with SemaphoreSlim to fix a possible race-condition that could trigger a deadlock

I don't know the specifics of the deadlock so I'm relying on your expertise!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he talks about the TCS not using TaskCreationOptions.RunContinuationsAsynchronously which we couldn't at the time of writing this code because of the netframework dependency

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go #762

if (!tokenSource.IsCancellationRequested && workQueue.TryDequeue(out Work work))
{
await work.Execute(model).ConfigureAwait(false);
}
}
}

public Task Stop()
public void Stop()
{
tokenSource.Cancel();
return task;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to return a task. Fixes the "Necessary Evil" above.

}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
//---------------------------------------------------------------------------

using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Impl;

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -93,7 +93,7 @@ public AutorecoveringConnection(ConnectionFactory factory, string clientProvided
m_factory = factory;
this.ClientProvidedName = clientProvidedName;
}

public event EventHandler<EventArgs> RecoverySucceeded
{
add
Expand Down Expand Up @@ -551,12 +551,8 @@ private void Init(IFrameHandler fh)
{
m_delegate = new Connection(m_factory, false,
fh, this.ClientProvidedName);

m_recoveryThread = new Thread(MainRecoveryLoop)
{
IsBackground = true
};
m_recoveryThread.Start();

m_recoveryTask = Task.Run(MainRecoveryLoop);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Task instead of new Thread. Utilizes the ThreadPool and gets rid of the overhead to create a new thread, also enabled turning the MainRecoveryLoop method into an async method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overhead for this method is unlikely to be significant enough in practice to matter but fair enough :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but using Task.Run also enables the MainRecoveryLoop to be defined as "async Task" instead of being forced to be a synchronous method :) Also delegates to using the ThreadPool for better core utilization, hopefully resulting in fewer thread blocks/stalls.


EventHandler<ShutdownEventArgs> recoveryListener = (_, args) =>
{
Expand Down Expand Up @@ -587,7 +583,7 @@ public void UpdateSecret(string newSecret, string reason)
public void Abort()
{
StopRecoveryLoop();
if(m_delegate.IsOpen)
if (m_delegate.IsOpen)
m_delegate.Abort();
}

Expand All @@ -608,7 +604,7 @@ public void Abort(TimeSpan timeout)
m_delegate.Abort(timeout);
}
}

///<summary>API-side invocation of connection abort with timeout.</summary>
public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
{
Expand Down Expand Up @@ -980,7 +976,7 @@ private bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args)
// connectivity loss or abrupt shutdown
args.Initiator == ShutdownInitiator.Library);
}

private enum RecoveryCommand
{
/// <summary>
Expand All @@ -1007,7 +1003,7 @@ private enum RecoveryConnectionState
Recovering
}

private Thread m_recoveryThread;
private Task m_recoveryTask;
private RecoveryConnectionState m_recoveryLoopState = RecoveryConnectionState.Connected;

private readonly BlockingCollection<RecoveryCommand> m_recoveryLoopCommandQueue = new BlockingCollection<RecoveryCommand>();
Expand All @@ -1017,7 +1013,7 @@ private enum RecoveryConnectionState
/// <summary>
/// This is the main loop for the auto-recovery thread.
/// </summary>
private void MainRecoveryLoop()
private async Task MainRecoveryLoop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can now do async since we use Task.Run instead of new Thread.

{
try
{
Expand All @@ -1026,10 +1022,10 @@ private void MainRecoveryLoop()
switch (m_recoveryLoopState)
{
case RecoveryConnectionState.Connected:
RecoveryLoopConnectedHandler(command);
await RecoveryLoopConnectedHandler(command).ConfigureAwait(false);
break;
case RecoveryConnectionState.Recovering:
RecoveryLoopRecoveringHandler(command);
await RecoveryLoopRecoveringHandler(command).ConfigureAwait(false);
break;
default:
ESLog.Warn("RecoveryLoop state is out of range.");
Expand All @@ -1045,6 +1041,7 @@ private void MainRecoveryLoop()
{
ESLog.Error("Main recovery loop threw unexpected exception.", e);
}

m_recoveryLoopComplete.SetResult(0);
}

Expand All @@ -1065,7 +1062,7 @@ private void StopRecoveryLoop()
/// Handles commands when in the Recovering state.
/// </summary>
/// <param name="command"></param>
private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
private async Task RecoveryLoopRecoveringHandler(RecoveryCommand command)
{
switch (command)
{
Expand All @@ -1079,7 +1076,8 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
}
else
{
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had no guarantee of working correctly, or ever even calling the continuation, since the result from Task.Delay was never awaited either synchronously or asynchronously. Fixed by using async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

await Task.Delay(m_factory.NetworkRecoveryInterval);
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
}

break;
Expand All @@ -1093,7 +1091,7 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
/// Handles commands when in the Connected state.
/// </summary>
/// <param name="command"></param>
private void RecoveryLoopConnectedHandler(RecoveryCommand command)
private async Task RecoveryLoopConnectedHandler(RecoveryCommand command)
{
switch (command)
{
Expand All @@ -1102,7 +1100,8 @@ private void RecoveryLoopConnectedHandler(RecoveryCommand command)
break;
case RecoveryCommand.BeginAutomaticRecovery:
m_recoveryLoopState = RecoveryConnectionState.Recovering;
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
await Task.Delay(m_factory.NetworkRecoveryInterval).ConfigureAwait(false);
m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery);
break;
default:
ESLog.Warn($"RecoveryLoop command {command} is out of range.");
Expand Down
Loading