Skip to content

Task optimizations and deadlock fix #687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 22, 2020

Conversation

stebet
Copy link
Contributor

@stebet stebet commented Jan 16, 2020

Proposed Changes

This PR fixes some Threading/Async issues by utilizing Tasks and the ThreadPool instead of dedicated threads. This should result in better better core utilization and gets rid of the overhead of creating separate threads. It also replaces AutoResetEvent and TaskCompletionSource objects with SemaphoreSlim to fix a possible race-condition that could trigger a deadlock.

These changes work with the 6.x branch but should ideally also be pushed out to 5.x as well to fix the race condition/deadlock issue.

Types of Changes

  • Bug fix (non-breaking change which fixes a possible deadlock issue)

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

Further Comments

Have run all tests locally, and below is a program that triggers said deadlock in the current version (5.1.2) of the client.

Important

Moving background work to Tasks + ThreadPool might impact those that create a lot more connections than their have core-count on their machines (not recommended b.t.w). In those cases the ThreadPool slowly increments the minimum number of Threads that can run at a time from the ThreadPool (affects one of the tests that creates hundreds of connections). In those cases, customers might want to call System.Threading.ThreadPool.SetMinThreads if they want all those connections to become immediately active, otherwise they slowly ramp up as the ThreadPool sees more work to do. This is well documented here: https://docs.microsoft.com/en-us/dotnet/api/system.threading.threadpool.setminthreads

Code to reproduce deadlock:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

using System;
using System.Threading;
using System.Threading.Tasks;

namespace DeadlockRabbitMQ
{
    class Program
    {
        private static int messagesSent = 0;
        private static int messagesReceived = 0;
        private static int batchesToSend = 1000;
        private static int itemsPerBatch = 500;
        static async Task Main(string[] args)
        {
            //ThreadPool.SetMinThreads(16 * Environment.ProcessorCount, 16 * Environment.ProcessorCount);
            var connectionString = new Uri("amqp://guest:guest@localhost/");

            var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
            var connection = connectionFactory.CreateConnection();
            var connection2 = connectionFactory.CreateConnection();
            var publisher = connection.CreateModel();
            var subscriber = connection2.CreateModel();
            publisher.ConfirmSelect();
            subscriber.ConfirmSelect();

            publisher.ExchangeDeclare("test", ExchangeType.Topic, true);

            subscriber.QueueDeclare("testqueue", true, false, true);
            var asyncListener = new AsyncEventingBasicConsumer(subscriber) { ConsumerTag = "testconsumer" };
            asyncListener.Received += AsyncListener_Received;
            subscriber.QueueBind("testqueue", "test", "myawesome.routing.key");
            subscriber.BasicConsume("testqueue", false, asyncListener.ConsumerTag, asyncListener);

            var batchPublish = Task.Run(() =>
            {
                while (messagesSent < batchesToSend * itemsPerBatch)
                {
                    var batch = publisher.CreateBasicPublishBatch();
                    for (int i = 0; i < itemsPerBatch; i++)
                    {
                        var properties = publisher.CreateBasicProperties();
                        properties.AppId = "testapp";
                        properties.CorrelationId = Guid.NewGuid().ToString();
                        batch.Add("test", "myawesome.routing.key", true, properties, BitConverter.GetBytes(i + messagesSent));
                    }
                    batch.Publish();
                    publisher.WaitForConfirmsOrDie();
                    messagesSent += itemsPerBatch;
                }
            });

            while (messagesReceived < batchesToSend * itemsPerBatch)
            {
                Console.WriteLine($"Messages received: {messagesReceived}");

                await Task.Delay(500);
            }

            await batchPublish;

            Console.WriteLine("Done receiving all messages.");
            Console.ReadLine();
            subscriber.Dispose();
            publisher.Dispose();
            connection.Dispose();
            connection2.Dispose();
        }

        private static async Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
        {
            // Doing things in parallel here is what will eventually trigger the deadlock,
            // probably due to a race condition in AsyncConsumerWorkService.Loop, although
            // I've had trouble pinpointing it exactly, but due to how the code in there uses
            // a TaskCompletionSource, and elsewhere overrides it, it might cause Enqueue and Loop
            // to eventually be working with different references, or that's at least the current theory.
            // Moving to better synchronization constructs solves the issue, and using the ThreadPool
            // is standard practice as well to maximize core utilization and reduce overhead of Thread creation
            await Task.WhenAll(IncrementCounter().AsTask(), WriteToConsole(@event.BasicProperties.CorrelationId, @event.Body).AsTask());
            (sender as AsyncEventingBasicConsumer).Model.BasicAck(@event.DeliveryTag, false);
        }

        private static ValueTask IncrementCounter()
        {
            Interlocked.Increment(ref messagesReceived);
            return new ValueTask();
        }

        private static ValueTask WriteToConsole(string correlationId, byte[] body)
        {
            Console.WriteLine($"Received event {correlationId} with value: {BitConverter.ToInt32(body, 0)}");
            return new ValueTask();
        }
    }
}

newWorkPool.Start();
}
}
workPools.GetOrAdd(model, StartNewWorkPool).Enqueue(work);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all performance sensitive code, so it's much more readable like this, but will revert if needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems reasonable.

Comment on lines -91 to -93
await Task.WhenAny(Task.Delay(waitTime, tokenSource.Token), messageArrived.Task).ConfigureAwait(false);
messageArrived.TrySetResult(true);
messageArrived = new TaskCompletionSource<bool>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee that Enqueue and Loop are working on the same messageArrived instances here. This is a lot easier to solve with SemaphoreSlim (Enqueue calls Release and increments it, releasing the await in Loop if it's at 0), and gets rid of the 100ms polling Task.Delay. Much cleaner and more efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I used a TCS here is because it is much faster than the semaphore version. The reason is that releasing the semaphore on the hot path vs TrySetReslut on the TCS is up to 7 to 8 times slower on NETFRAMEWORK and up to 3-4 times slower on NETCORE 3.1

Netframework without contention

Method Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
TaskCompletionSource 1.874 ns 0.0431 ns 0.0382 ns 1.00 0.00 - - - -
SemaphoreSlim 15.351 ns 0.0410 ns 0.0342 ns 8.21 0.18 - - - -

Netframework with contention

7

Method Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
TaskCompletionSource 2.137 ns 0.0650 ns 0.0608 ns 1.00 0.00 - - - -
SemaphoreSlim 15.211 ns 0.2943 ns 0.2753 ns 7.13 0.29 - - - -

Netcore 3.1 with contention

Method Mean Error StdDev Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
TaskCompletionSource 4.221 ns 0.1009 ns 0.0944 ns 1.00 0.00 - - - -
SemaphoreSlim 13.062 ns 0.1891 ns 0.1769 ns 3.10 0.07 - - - -

Do you want me to use the TCS again and send a PR to fix this @michaelklishin @lukebakken ?

See danielmarbach/MicroBenchmarks@516eac9

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please submit a PR. Thanks for the benchmarks. I assume that the original purpose of #687 (fixing a potential deadlock) will still apply.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is @stebet 's comment:

It also replaces AutoResetEvent and TaskCompletionSource objects with SemaphoreSlim to fix a possible race-condition that could trigger a deadlock

I don't know the specifics of the deadlock so I'm relying on your expertise!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think he talks about the TCS not using TaskCreationOptions.RunContinuationsAsynchronously which we couldn't at the time of writing this code because of the netframework dependency

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we go #762

{
tokenSource.Cancel();
return task;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to return a task. Fixes the "Necessary Evil" above.

};
m_recoveryThread.Start();

m_recoveryTask = Task.Run(MainRecoveryLoop);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Task instead of new Thread. Utilizes the ThreadPool and gets rid of the overhead to create a new thread, also enabled turning the MainRecoveryLoop method into an async method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overhead for this method is unlikely to be significant enough in practice to matter but fair enough :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but using Task.Run also enables the MainRecoveryLoop to be defined as "async Task" instead of being forced to be a synchronous method :) Also delegates to using the ThreadPool for better core utilization, hopefully resulting in fewer thread blocks/stalls.

@@ -1027,7 +1023,7 @@ private enum RecoveryConnectionState
/// <summary>
/// This is the main loop for the auto-recovery thread.
/// </summary>
private void MainRecoveryLoop()
private async Task MainRecoveryLoop()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can now do async since we use Task.Run instead of new Thread.

@@ -1089,7 +1086,8 @@ private void RecoveryLoopRecoveringHandler(RecoveryCommand command)
}
else
{
Task.Delay(m_factory.NetworkRecoveryInterval).ContinueWith(t => { m_recoveryLoopCommandQueue.TryAdd(RecoveryCommand.PerformAutomaticRecovery); });
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This had no guarantee of working correctly, or ever even calling the continuation, since the result from Task.Delay was never awaited either synchronously or asynchronously. Fixed by using async.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

Comment on lines 1050 to 1059
var taskName = "AMQP Connection " + Endpoint;

#if NETFX_CORE
Task.Factory.StartNew(this.MainLoop, TaskCreationOptions.LongRunning);
#else
var mainLoopThread = new Thread(MainLoop);
mainLoopThread.Name = taskName;
mainLoopThread.IsBackground = useBackgroundThread;
mainLoopThread.Start();
#endif
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NETFX_CORE code is redundant as the compilation symbol is never created (haven't removed other instances). Moved to Task.Run instead of new Thread, which uses the ThreadPool and unlock turning the MainLoop into an async Task (would fit well with System.IO.Pipelines which I have already started working on in another branch).

Comment on lines -52 to -53
readonly AutoResetEvent messageArrived;
readonly TimeSpan waitTime;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Much better to solve with SemaphoreSlim rather than an AutoResetEvent. Cleaner code and more reliable.

};
thread.Start();
#endif
worker = Task.Run(Loop);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use ThreadPool rather than a Thread. Turns the Loop into an async Task instead, for better utilization and gets rid of polling mechanism.

@@ -98,14 +77,13 @@ void Loop()
{
}
}

messageArrived.WaitOne(waitTime);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to poll anymore, since the Semaphore + WaitAsync takes care of suspending the task until it has something to do. Better core utilization.

@michaelklishin
Copy link
Contributor

We cannot backport this as it would change the .NET version/standard requirement. Unfortunately for us this (.NET) user community's read of semantic versioning is the strictest possible and we don't want a pitchfork uprising 👀

Upgrading to 6.0 should be easy enough, though, so I expect most users to do it.

@michaelklishin
Copy link
Contributor

Would it be practical to try to come up with a test that would previously trigger said deadlock? Proving an absense of something is hard but we can run a workload N times to have some level of assurance.

@stebet
Copy link
Contributor Author

stebet commented Jan 16, 2020

We cannot backport this as it would change the .NET version/standard requirement. Unfortunately for us this (.NET) user community's read of semantic versioning is the strictest possible and we don't want a pitchfork uprising 👀

Upgrading to 6.0 should be easy enough, though, so I expect most users to do it.

These changes should actually work down to at least .NET Framework 4.5. I can backport them to the 5.x branch if you want.

@stebet
Copy link
Contributor Author

stebet commented Jan 16, 2020

Would it be practical to try to come up with a test that would previously trigger said deadlock? Proving an absense of something is hard but we can run a workload N times to have some level of assurance.

Yeah, I'm going to try to do that. Have been making fixes to get the tests to run on my local machine, then I'll create a repro case.

@stebet
Copy link
Contributor Author

stebet commented Jan 16, 2020

Have now run all tests on my machine successfully (thanks Docker tools with WSL2 support!). Will now start working on a test case for the deadlock issue.

@lukebakken lukebakken self-requested a review January 16, 2020 15:14
@stebet
Copy link
Contributor Author

stebet commented Jan 17, 2020

I have updated the description with a repro example as well as some important notes.

@michaelklishin
Copy link
Contributor

@stebaker92 can you please rebase this on top of master?

@lukebakken
Copy link
Collaborator

@michaelklishin I just did that 😄

…able connection and consumers so far.

Code cleanups. Removing unneeded dependency.
@michaelklishin michaelklishin merged commit f171e16 into rabbitmq:master Jan 22, 2020
@michaelklishin
Copy link
Contributor

@stebet I think we can take it from here as follow-up PRs. Thank you very much, this is a substantial contribution to the client 🙏

@stebet
Copy link
Contributor Author

stebet commented Jan 22, 2020

Glad to hear that. Í have a hefty PR with pretty substantial allocation reductions in the works as well :)

@lukebakken
Copy link
Collaborator

@stebet - I tried backporting commit 66d5966 to the 5.x branch but it wasn't pretty and does not yet compile due to too many name changes between 5.x and master. You can see my WIP PR here:

#699

I'm wondering if it is worth the effort to continue (@michaelklishin - thoughts?)

@stebet
Copy link
Contributor Author

stebet commented Jan 28, 2020

I can take a look as well. I'm at NDC in London for this week, so might be sporadic, but it should be very doable.

@lukebakken
Copy link
Collaborator

@stebet - don't worry about it. I need to carefully review the commits to master since the last 5.x release and backport more than those that were part of #687. I look forward to seeing your upcoming contributions.

@danielmarbach
Copy link
Collaborator

Some good changes. Just as an FYI. The initial design I made for the async consumer service followed as closely as possible the sync worker service design to keep the amount of changes as small as possible. Great to see that someone took this code to the next level.

@lukebakken
Copy link
Collaborator

@stebet please see my comment here: #710 (comment)

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants