-
Notifications
You must be signed in to change notification settings - Fork 606
Task optimizations and deadlock fix #687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
newWorkPool.Start(); | ||
} | ||
} | ||
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not at all performance sensitive code, so it's much more readable like this, but will revert if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems reasonable.
await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false); | ||
messageArrived.TrySetResult(true); | ||
messageArrived = new TaskCompletionSource<bool>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no guarantee that Enqueue and Loop are working on the same messageArrived instances here. This is a lot easier to solve with SemaphoreSlim (Enqueue calls Release and increments it, releasing the await in Loop if it's at 0), and gets rid of the 100ms polling Task.Delay. Much cleaner and more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I used a TCS here is because it is much faster than the semaphore version. The reason is that releasing the semaphore on the hot path vs TrySetReslut on the TCS is up to 7 to 8 times slower on NETFRAMEWORK and up to 3-4 times slower on NETCORE 3.1
Netframework without contention
Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|
TaskCompletionSource | 1.874 ns | 0.0431 ns | 0.0382 ns | 1.00 | 0.00 | - | - | - | - |
SemaphoreSlim | 15.351 ns | 0.0410 ns | 0.0342 ns | 8.21 | 0.18 | - | - | - | - |
Netframework with contention
7
Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|
TaskCompletionSource | 2.137 ns | 0.0650 ns | 0.0608 ns | 1.00 | 0.00 | - | - | - | - |
SemaphoreSlim | 15.211 ns | 0.2943 ns | 0.2753 ns | 7.13 | 0.29 | - | - | - | - |
Netcore 3.1 with contention
Method | Mean | Error | StdDev | Ratio | RatioSD | Gen 0 | Gen 1 | Gen 2 | Allocated |
---|---|---|---|---|---|---|---|---|---|
TaskCompletionSource | 4.221 ns | 0.1009 ns | 0.0944 ns | 1.00 | 0.00 | - | - | - | - |
SemaphoreSlim | 13.062 ns | 0.1891 ns | 0.1769 ns | 3.10 | 0.07 | - | - | - | - |
Do you want me to use the TCS again and send a PR to fix this @michaelklishin @lukebakken ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please submit a PR. Thanks for the benchmarks. I assume that the original purpose of #687 (fixing a potential deadlock) will still apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is @stebet 's comment:
It also replaces AutoResetEvent and TaskCompletionSource objects with SemaphoreSlim to fix a possible race-condition that could trigger a deadlock
I don't know the specifics of the deadlock so I'm relying on your expertise!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think he talks about the TCS not using TaskCreationOptions.RunContinuationsAsynchronously
which we couldn't at the time of writing this code because of the netframework dependency
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we go #762
{ | ||
tokenSource.Cancel(); | ||
return task; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to return a task. Fixes the "Necessary Evil" above.
}; | ||
m_recoveryThread.Start(); | ||
|
||
m_recoveryTask = Task.Run(MainRecoveryLoop); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Task instead of new Thread. Utilizes the ThreadPool and gets rid of the overhead to create a new thread, also enabled turning the MainRecoveryLoop method into an async method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overhead for this method is unlikely to be significant enough in practice to matter but fair enough :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, but using Task.Run also enables the MainRecoveryLoop to be defined as "async Task" instead of being forced to be a synchronous method :) Also delegates to using the ThreadPool for better core utilization, hopefully resulting in fewer thread blocks/stalls.
@@ -1027,7 +1023,7 @@ private enum RecoveryConnectionState | |||
/// <summary> | |||
/// This is the main loop for the auto-recovery thread. | |||
/// </summary> | |||
private void MainRecoveryLoop() | |||
private async Task MainRecoveryLoop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can now do async since we use Task.Run instead of new Thread.
@@ -1089,7 +1086,8 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command) | |||
} | |||
else | |||
{ | |||
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This had no guarantee of working correctly, or ever even calling the continuation, since the result from Task.Delay was never awaited either synchronously or asynchronously. Fixed by using async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
var taskName = "AMQP Connection " + Endpoint; | ||
|
||
#if NETFX_CORE | ||
Task.Factory.StartNew(this.MainLoop, TaskCreationOptions.LongRunning); | ||
#else | ||
var mainLoopThread = new Thread(MainLoop); | ||
mainLoopThread.Name = taskName; | ||
mainLoopThread.IsBackground = useBackgroundThread; | ||
mainLoopThread.Start(); | ||
#endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NETFX_CORE code is redundant as the compilation symbol is never created (haven't removed other instances). Moved to Task.Run instead of new Thread, which uses the ThreadPool and unlock turning the MainLoop into an async Task (would fit well with System.IO.Pipelines which I have already started working on in another branch).
readonly AutoResetEvent messageArrived; | ||
readonly TimeSpan waitTime; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much better to solve with SemaphoreSlim rather than an AutoResetEvent. Cleaner code and more reliable.
}; | ||
thread.Start(); | ||
#endif | ||
worker = Task.Run(Loop); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use ThreadPool rather than a Thread. Turns the Loop into an async Task instead, for better utilization and gets rid of polling mechanism.
@@ -98,14 +77,13 @@ void Loop() | |||
{ | |||
} | |||
} | |||
|
|||
messageArrived.WaitOne(waitTime); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to poll anymore, since the Semaphore + WaitAsync takes care of suspending the task until it has something to do. Better core utilization.
We cannot backport this as it would change the .NET version/standard requirement. Unfortunately for us this (.NET) user community's read of semantic versioning is the strictest possible and we don't want a pitchfork uprising 👀 Upgrading to 6.0 should be easy enough, though, so I expect most users to do it. |
Would it be practical to try to come up with a test that would previously trigger said deadlock? Proving an absense of something is hard but we can run a workload N times to have some level of assurance. |
These changes should actually work down to at least .NET Framework 4.5. I can backport them to the 5.x branch if you want. |
Yeah, I'm going to try to do that. Have been making fixes to get the tests to run on my local machine, then I'll create a repro case. |
Have now run all tests on my machine successfully (thanks Docker tools with WSL2 support!). Will now start working on a test case for the deadlock issue. |
I have updated the description with a repro example as well as some important notes. |
@stebaker92 can you please rebase this on top of |
2f97275
to
162b410
Compare
@michaelklishin I just did that 😄 |
…able connection and consumers so far. Code cleanups. Removing unneeded dependency.
fcf1be3
to
66d5966
Compare
@stebet I think we can take it from here as follow-up PRs. Thank you very much, this is a substantial contribution to the client 🙏 |
Glad to hear that. Í have a hefty PR with pretty substantial allocation reductions in the works as well :) |
@stebet - I tried backporting commit 66d5966 to the I'm wondering if it is worth the effort to continue (@michaelklishin - thoughts?) |
I can take a look as well. I'm at NDC in London for this week, so might be sporadic, but it should be very doable. |
Some good changes. Just as an FYI. The initial design I made for the async consumer service followed as closely as possible the sync worker service design to keep the amount of changes as small as possible. Great to see that someone took this code to the next level. |
@stebet please see my comment here: #710 (comment) Thanks! |
Proposed Changes
This PR fixes some Threading/Async issues by utilizing Tasks and the ThreadPool instead of dedicated threads. This should result in better better core utilization and gets rid of the overhead of creating separate threads. It also replaces AutoResetEvent and TaskCompletionSource objects with SemaphoreSlim to fix a possible race-condition that could trigger a deadlock.
These changes work with the 6.x branch but should ideally also be pushed out to 5.x as well to fix the race condition/deadlock issue.
Types of Changes
Checklist
CONTRIBUTING.md
documentFurther Comments
Have run all tests locally, and below is a program that triggers said deadlock in the current version (5.1.2) of the client.
Important
Moving background work to Tasks + ThreadPool might impact those that create a lot more connections than their have core-count on their machines (not recommended b.t.w). In those cases the ThreadPool slowly increments the minimum number of Threads that can run at a time from the ThreadPool (affects one of the tests that creates hundreds of connections). In those cases, customers might want to call System.Threading.ThreadPool.SetMinThreads if they want all those connections to become immediately active, otherwise they slowly ramp up as the ThreadPool sees more work to do. This is well documented here: https://docs.microsoft.com/en-us/dotnet/api/system.threading.threadpool.setminthreads
Code to reproduce deadlock: