Skip to content

Batching tasks for the AsyncConsumerWorkService to increase throughput. #806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from

Conversation

stebet
Copy link
Contributor

@stebet stebet commented Apr 6, 2020

Proposed Changes

This change batches tasks when running the AsyncConsumerWorkService to increase the throughput and parallelizing task execution.

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)

Checklist

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • All tests pass locally with my changes
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)
  • Any dependent changes have been merged and published in related repositories

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

Would be nice to get a sanity check from @danielmarbach :) This increased throughput quite a bit on my laptop.

@lukebakken
Copy link
Collaborator

Could this result in out-of-order operations? One I'm thinking of is the callback for basic.deliver. Consumers expect to get message callbacks called in the order that RabbitMQ delivers them.

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

I hadn't thought of that. It might result in that actually, depending on the degree of parallelism. Hadn't thought that consumers would care about messages. Perhaps we could make this an option, that defaults to false then?

@lukebakken
Copy link
Collaborator

How much of a gain are we talking about?

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

Depends on the amount of messages and processing needed. If the consumers take any amount of time handling the messages received, the gains can be quite dramatic. A good test would be to run a couple of consumer that take some milliseconds (simulating some IO or DB operation) and see how long it takes to handle X amount of messages that they receive in a short duration. If they need to run them in-order they should take around X * processing-time to finish, with the parallel case, granted they don't care about order, they should ideally be around (X * processing-time)/CPU-core-count

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

I have added a flag to the ConnectionFactory, called DispatchAsyncConsumersInParallel (defaulting to false), which will the consumers this way. That way it shouldn't be a breaking change and consumers can opt-in if they want.

Copy link
Collaborator

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we establish an upper bound on the number of parallel tasks?

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

Can we establish an upper bounds on the number of parallel tasks?

That's done automatically (but can be configured) by the .NET Runtime. See here: https://docs.microsoft.com/en-us/dotnet/standard/threading/the-managed-thread-pool

@danielmarbach
Copy link
Collaborator

I need to have a closer look later, just a quick thought, instead of a bool you could make this setting an int and then use that as a way to set the number of available slots in a semaphore slim to limit the concurrency. Currently this is unbounded concurrency depending on the ability of the thread pool capacity and ramp up capability and the number of work available and I think that is a bit dangerous.

On the ordering I cannot really give a lot of input but it feels off that we have a concurrent queue to enqueue and take out in order we added just to then adding concurrency again based on the exposed caveat. This sounds like a huge tradeoff we are implicitly making that seems to be against the current design of the client as it is today. Because if ordering would not be a problem we could always have chosen a ConcurrentBag instead of a concurrent queue for getting the work tasks.

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

I need to have a closer look later, just a quick thought, instead of a bool you could make this setting an int and then use that as a way to set the number of available slots in a semaphore slim to limit the concurrency. Currently this is unbounded concurrency depending on the ability of the thread pool capacity and ramp up capability and the number of work available and I think that is a bit dangerous.

Most best practices just say that you should depend on the task scheduler and the thread pool to make the best decisions here. I wonder if it would then need to be two properties, one to say if you want parallelism, and another if you'd like to set your own concurrency instead of using the TaskScheduler. That should be easy to do as well if we want. Not sure how much that'd be used though, unless you estimate a big pile-up or batches of tasks, but it's easy to add.

On the ordering I cannot really give a lot of input but it feels off that we have a concurrent queue to enqueue and take out in order we added just to then adding concurrency again based on the exposed caveat. This sounds like a huge tradeoff we are implicitly making that seems to be against the current design of the client as it is today. Because if ordering would not be a problem we could always have chosen a ConcurrentBag instead of a concurrent queue for getting the work tasks.

The queue is currently just used as a "Channel" to funnel tasks into. Currently the implementation just takes from the queue and puts into a list and then calls Task.WhenAll on the resulting list of tasks to run. Using a queue has the benefit of having TryDequeue, which we could replace with TryTake on the ConcurrentBag.

@danielmarbach
Copy link
Collaborator

Most best practices just say that you should depend on the task scheduler and the thread pool to make the best decisions here. I wonder if it would then need to be two properties, one to say if you want parallelism, and another if you'd like to set your own concurrency instead of using the TaskScheduler.

In order to make this statement possible you would need to expose the possibility to provide a custom TaskScheduler as a configuration option. This was once added to the connection factory but then remove again. Even if this would be re-added (which I would find highly questionable) the implications of this would be that someone has to implement a custom scheduler which is hugely complex. This mean majority of cases the TaskScheduler.Default would be used which means this code runs under unbounded concurrency. If the injection rate of work is high enough work dispatches can be up to 100s or 1000s concurrently. If this is not aligned with the underlying potentially constraint resources in use this optimization could also turn out to be the bottle neck again due to overwhelming potentially other weak links in the chain. In my point of view you almost never want to have unbounded concurrency by default but align it with reasonable defaults that can be changed if needed. For example one reasonable default could be Math.Max(ProcessorCount, 2) or something similar.

Another thing we might want to measure here is the necessity do to the if on the hot path vs for example using the TryAdd and TryTake methods of the IProducerConsumerCollection and an interface dispatch that might be devirtualized by improvements in .NET and more effective than doing the if/else branching on the hotpath.

That being said I still would like to see an indepth discussion on this PR about the implications of unbounded vs bounded concurrency as well as lifting the ordering constraints on the client in the worker service and the implications / assumptions that might fall apart on other areas of the code.

@danielmarbach
Copy link
Collaborator

I realized I might sound like a grumpy old guy that wants to hold up innovation. If I gave this impression that wasn't my intention at all. My intention is to help guide decision making by mentioning tradeoffs and pros/cons.

@danielmarbach
Copy link
Collaborator

danielmarbach commented Apr 6, 2020

queue vs bag with parallelism

danielmarbach/MicroBenchmarks@796c59a

BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363
Intel Core i7-8550U CPU 1.80GHz (Kaby Lake R), 1 CPU, 8 logical and 4 physical cores
.NET Core SDK=3.1.101
  [Host]    : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT
  MediumRun : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT

Job=MediumRun  IterationCount=15  LaunchCount=2  
WarmupCount=10  
Method Mean Error StdDev StdErr Min Q1 Median Q3 Max Op/s Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
ConcurrentQueue_Enqueue 109.79 us 7.571 us 10.858 us 2.052 us 93.09 us 100.80 us 107.16 us 118.64 us 135.82 us 9,107.9 1.00 0.00 0.7324 - - 11.32 KB
ConcurrentBag_Add 27.17 us 2.717 us 3.983 us 0.740 us 21.15 us 24.06 us 27.48 us 29.17 us 39.16 us 36,811.0 0.25 0.05 0.6714 - - 2.71 KB

branching vs dispatch

danielmarbach/MicroBenchmarks@3f37f6b

BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363
Intel Core i7-8550U CPU 1.80GHz (Kaby Lake R), 1 CPU, 8 logical and 4 physical cores
.NET Core SDK=3.1.101
  [Host]     : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT
  DefaultJob : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT

Method Flag Mean Error StdDev StdErr Min Q1 Median Q3 Max Op/s Ratio RatioSD Gen 0 Gen 1 Gen 2 Allocated
Branching False 32.36 ns 2.567 ns 7.325 ns 0.755 ns 17.32 ns 28.18 ns 31.50 ns 35.64 ns 50.23 ns 30,899,614.3 1.00 0.00 - - - 8 B
InterfaceDispatch False 25.93 ns 3.088 ns 9.056 ns 0.910 ns 12.42 ns 16.72 ns 26.19 ns 33.74 ns 43.11 ns 38,560,903.0 0.82 0.34 - - - 8 B
Branching True 63.91 ns 1.346 ns 2.898 ns 0.387 ns 50.14 ns 63.09 ns 64.35 ns 65.42 ns 68.46 ns 15,646,945.3 1.00 0.00 - - - -
InterfaceDispatch True 44.23 ns 6.642 ns 19.479 ns 1.958 ns 23.78 ns 24.75 ns 47.26 ns 59.61 ns 96.41 ns 22,607,216.1 0.66 0.32 - - - -

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

No worries @danielmarbach. I specifically asked for your input because I value it :) PRs and discussions around different implementations is where I feel we really learn.

@stebet
Copy link
Contributor Author

stebet commented Apr 6, 2020

All excellent points. I'd love to do more experiments with this (so don't merge this yet!).

One thing I feel this has also surfaced is the need for a better configuration mechanism in the library, rather than everything going through the ConnectionFactory.

I also had the thought that maybe a Handler pattern like is used with DelegatingHandlers in HttpClient might be useful to process/augment messages as they are sent/received. That might provide a good abstraction over a lot of common things like workers and parallelism, loggers and diagnostic scenarios.

@lukebakken lukebakken modified the milestones: 6.0.0, 7.0.0 Apr 6, 2020
@lukebakken
Copy link
Collaborator

Thanks @danielmarbach . I would like to see a reasonable upper bound on concurrency enforced. This feature is not a must-have for 6.0 so I moved it to 7.0. Once discussion is ironed out this could be a 6.X feature as it is backwards-compatible due to the "opt-in" nature.

@bording
Copy link
Collaborator

bording commented Apr 6, 2020

nce discussion is ironed out this could be a 6.X feature as it is backwards-compatible due to the "opt-in" nature.

At the moment this is actually a breaking change because there is a modification to a public interface. Which goes back to my statements elsewhere about trying to get rid of those from the public API surface.

volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
private Task _worker;
private readonly List<Task> _workTasks = new List<Task>();
Copy link

@quixoticaxis quixoticaxis Apr 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_workTasks can be made into a local that is created per loop iteration. It is not used anywhere else as far as I can see. That way it can be even optimized a little by creating it with the default "predictable" capacity: var tasksToAwaitOnThisIteration = new List<Task>(_workBag.Count).
Is the idea to let the list be as big as the biggest encountered batch?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do that than you are making a tradeoff between the list growing and the allocation per iteration. The GC costs can quickly become higher. If the suggestion of limiting the concurrency that I made are taken into account the list can have a fixed size and never need to grow plus you don't create Gen0 garbage

@danielmarbach
Copy link
Collaborator

Let me know if I can help somehow when we start working on this for v7.

@stebet
Copy link
Contributor Author

stebet commented Apr 14, 2020

Let me know if I can help somehow when we start working on this for v7.

Will do :)

@shaneqld
Copy link

I don't think the correct approach is to use Task.WhenAll. This will simply batch together tasks that are queued, wait for them all to finish, and batch up the next lot. I think the ideal behavior would be to execute the tasks as they happen, up to some specified concurrency limit.

I've been working on the same problem attempting to change the behavior so async work is executed in parallel. I've tried two approaches:

  1. Modify the library to introduce a new "AsyncParallelConsumerWorkService" (instead of modifying -- don't want to change something that isn't broken). The implementation uses SemaphoreSlim to limit concurrency (no batching, executed immediately if enough capacity). I introduced a "ConsumerWorkServiceFactory" member in ConnectionFactory so you can control what work service you use. Unfortunately wiring this up for use in the public API is hard. References to ConsumerWorkService and AsyncConsumerWorkService are hardcoded. There is no interface to use. And many of the required types that need to be exposed on the public interface are currently internal.
  2. Write a class that performs parallelisation outside of the library.

If anyone is interested, here is a rough implementation for solution 2:

public class ParallelBasicDeliverEventHandler : IDisposable
{
    private readonly SemaphoreSlim _semaphore;

    public event AsyncEventHandler<BasicDeliverEventArgs> Received;

    public ParallelBasicDeliverEventHandler(int maxConcurrent)
    {
        _semaphore = new SemaphoreSlim(maxConcurrent);
    }

    public Task Handle(object sender, BasicDeliverEventArgs args)
    {
        // copy memory so it's available for later
        args.Body = args.Body.ToArray();

#pragma warning disable CS4014
        // lack of "await" intentional
        HandleInternal(sender, args);
#pragma warning restore CS4014

        return Task.CompletedTask;
    }

    private async Task HandleInternal(object sender, BasicDeliverEventArgs args)
    {
        try
        {
            await _semaphore.WaitAsync();
            try
            {
                if (Received != null)
                {
                    await Received(sender, args);
                }
            }
            finally
            {
                _semaphore.Release();
            }
        }
        catch
        {
            // suppress
        }
    }

    public void Dispose()
    {
        _semaphore.Dispose();
    }
}

And to use:

// how many messages we want to process in parallel (in this case, 10)
const int concurrency = 10;

// configure prefetch so server sends us multiple messages
model.BasicQos(0, concurrency, false);

// wire up consumer
var consumer = new AsyncEventingBasicConsumer(model);
var handler = new ParallelBasicDeliverEventHandler(concurrency);
consumer.Received += handler.Handle;
handler.Received += async (sender, args) =>
{
    Console.WriteLine($"Received {args.DeliveryTag}");
    await Task.Delay(1000);

    var consumer = (AsyncEventingBasicConsumer)sender;
    consumer.Model.BasicAck(args.DeliveryTag, false);
};

// begin consuming
model.BasicConsume("test", false, consumer);

Hope this helps.

@danielmarbach
Copy link
Collaborator

I think the ideal behavior would be to execute the tasks as they happen, up to some specified concurrency limit.

That is exactly what I have been suggesting instead.

@shaneqld
Copy link

I think the ideal behavior would be to execute the tasks as they happen, up to some specified concurrency limit.

That is exactly what I have been suggesting instead.

I skimmed through the issue history too quickly! I agree with your points.

Ordering shouldn't matter on client side. I don't understand how you could guarantee ordering and execute in parallel at the same time.

It would be much safer explicitly limiting parallelism. While unbounded parallelism might be safe if eg you didn't autoAck and your prefetch was sane, I don't think it's worth the risk when limiting it is so straightforward.

Regarding the default limit, as we're talking about async, I'm not sure if setting it as the processor count is the best default. It would make sense if the handler was doing mostly CPU intensive work, but I think in reality we're usually waiting around for a response from an external service. That said, it's probably the safest default, as it would cater for those doing CPU intensive work while the rest of us will probably want to tweak it anyway. My only other suggestion for the default would be the prefetch count (or some function of it), as I think people generally increase prefetch if they want to go faster.

@danielmarbach
Copy link
Collaborator

The assumption around the processor count is that by default the IO thread pool is assumed to have a fixed number of threads that corresponds with the concurrency count and never ramps up

You cannot set the maximum number of worker threads or I/O completion threads to a number smaller than the number of processors on the computer. To determine how many processors are present, retrieve the value of the Environment.ProcessorCount property. In addition, you cannot set the maximum number of worker threads or I/O completion threads to a number smaller than the corresponding minimum number of worker threads or I/O completion threads. To determine the minimum thread pool size, call the GetMinThreads method.

https://docs.microsoft.com/en-us/dotnet/api/system.threading.threadpool.setmaxthreads

Anyway I think your suggestions are great input to consider once we start implementing this for the next version of the client.

@lukebakken
Copy link
Collaborator

Ordering shouldn't matter on client side. I don't understand how you could guarantee ordering and execute in parallel at the same time.

Ordering does matter as it is what users expect by default. This is why "batching tasks" is an opt-in feature.

@shaneqld
Copy link

To clarify, my comments were in the context of executing multiple work items in parallel. I do agree that the library should use a FIFO queue when dispatching work (as it does). This will give you ordering if you limit concurrent work to 1. But if >1, we should expect work to completed out of order.

@danielmarbach
Copy link
Collaborator

As an idea/spike #866

@lukebakken
Copy link
Collaborator

This PR is superseded by #866. Thanks everyone!

@lukebakken lukebakken closed this Jun 24, 2020
@danielmarbach danielmarbach deleted the batchWorkTasks branch June 24, 2020 18:04
@lukebakken lukebakken modified the milestones: 8.0.0, 7.0.0 Mar 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants