-
Notifications
You must be signed in to change notification settings - Fork 606
Batching tasks for the AsyncConsumerWorkService to increase throughput. #806
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Would be nice to get a sanity check from @danielmarbach :) This increased throughput quite a bit on my laptop. |
Could this result in out-of-order operations? One I'm thinking of is the callback for |
I hadn't thought of that. It might result in that actually, depending on the degree of parallelism. Hadn't thought that consumers would care about messages. Perhaps we could make this an option, that defaults to false then? |
How much of a gain are we talking about? |
Depends on the amount of messages and processing needed. If the consumers take any amount of time handling the messages received, the gains can be quite dramatic. A good test would be to run a couple of consumer that take some milliseconds (simulating some IO or DB operation) and see how long it takes to handle X amount of messages that they receive in a short duration. If they need to run them in-order they should take around X * processing-time to finish, with the parallel case, granted they don't care about order, they should ideally be around (X * processing-time)/CPU-core-count |
I have added a flag to the ConnectionFactory, called DispatchAsyncConsumersInParallel (defaulting to false), which will the consumers this way. That way it shouldn't be a breaking change and consumers can opt-in if they want. |
…d disabled by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we establish an upper bound on the number of parallel tasks?
That's done automatically (but can be configured) by the .NET Runtime. See here: https://docs.microsoft.com/en-us/dotnet/standard/threading/the-managed-thread-pool |
I need to have a closer look later, just a quick thought, instead of a bool you could make this setting an int and then use that as a way to set the number of available slots in a semaphore slim to limit the concurrency. Currently this is unbounded concurrency depending on the ability of the thread pool capacity and ramp up capability and the number of work available and I think that is a bit dangerous. On the ordering I cannot really give a lot of input but it feels off that we have a concurrent queue to enqueue and take out in order we added just to then adding concurrency again based on the exposed caveat. This sounds like a huge tradeoff we are implicitly making that seems to be against the current design of the client as it is today. Because if ordering would not be a problem we could always have chosen a ConcurrentBag instead of a concurrent queue for getting the work tasks. |
Most best practices just say that you should depend on the task scheduler and the thread pool to make the best decisions here. I wonder if it would then need to be two properties, one to say if you want parallelism, and another if you'd like to set your own concurrency instead of using the TaskScheduler. That should be easy to do as well if we want. Not sure how much that'd be used though, unless you estimate a big pile-up or batches of tasks, but it's easy to add.
The queue is currently just used as a "Channel" to funnel tasks into. Currently the implementation just takes from the queue and puts into a list and then calls Task.WhenAll on the resulting list of tasks to run. Using a queue has the benefit of having |
In order to make this statement possible you would need to expose the possibility to provide a custom TaskScheduler as a configuration option. This was once added to the connection factory but then remove again. Even if this would be re-added (which I would find highly questionable) the implications of this would be that someone has to implement a custom scheduler which is hugely complex. This mean majority of cases the TaskScheduler.Default would be used which means this code runs under unbounded concurrency. If the injection rate of work is high enough work dispatches can be up to 100s or 1000s concurrently. If this is not aligned with the underlying potentially constraint resources in use this optimization could also turn out to be the bottle neck again due to overwhelming potentially other weak links in the chain. In my point of view you almost never want to have unbounded concurrency by default but align it with reasonable defaults that can be changed if needed. For example one reasonable default could be Another thing we might want to measure here is the necessity do to the if on the hot path vs for example using the That being said I still would like to see an indepth discussion on this PR about the implications of unbounded vs bounded concurrency as well as lifting the ordering constraints on the client in the worker service and the implications / assumptions that might fall apart on other areas of the code. |
I realized I might sound like a grumpy old guy that wants to hold up innovation. If I gave this impression that wasn't my intention at all. My intention is to help guide decision making by mentioning tradeoffs and pros/cons. |
queue vs bag with parallelismdanielmarbach/MicroBenchmarks@796c59a BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363
Intel Core i7-8550U CPU 1.80GHz (Kaby Lake R), 1 CPU, 8 logical and 4 physical cores
.NET Core SDK=3.1.101
[Host] : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT
MediumRun : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT
Job=MediumRun IterationCount=15 LaunchCount=2
WarmupCount=10
branching vs dispatchdanielmarbach/MicroBenchmarks@3f37f6b BenchmarkDotNet=v0.12.0, OS=Windows 10.0.18363
Intel Core i7-8550U CPU 1.80GHz (Kaby Lake R), 1 CPU, 8 logical and 4 physical cores
.NET Core SDK=3.1.101
[Host] : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT
DefaultJob : .NET Core 3.1.1 (CoreCLR 4.700.19.60701, CoreFX 4.700.19.60801), X64 RyuJIT
|
No worries @danielmarbach. I specifically asked for your input because I value it :) PRs and discussions around different implementations is where I feel we really learn. |
All excellent points. I'd love to do more experiments with this (so don't merge this yet!). One thing I feel this has also surfaced is the need for a better configuration mechanism in the library, rather than everything going through the ConnectionFactory. I also had the thought that maybe a Handler pattern like is used with DelegatingHandlers in HttpClient might be useful to process/augment messages as they are sent/received. That might provide a good abstraction over a lot of common things like workers and parallelism, loggers and diagnostic scenarios. |
Thanks @danielmarbach . I would like to see a reasonable upper bound on concurrency enforced. This feature is not a must-have for |
At the moment this is actually a breaking change because there is a modification to a public interface. Which goes back to my statements elsewhere about trying to get rid of those from the public API surface. |
volatile TaskCompletionSource<bool> _syncSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
private Task _worker; | ||
private readonly List<Task> _workTasks = new List<Task>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_workTasks
can be made into a local that is created per loop iteration. It is not used anywhere else as far as I can see. That way it can be even optimized a little by creating it with the default "predictable" capacity: var tasksToAwaitOnThisIteration = new List<Task>(_workBag.Count)
.
Is the idea to let the list be as big as the biggest encountered batch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you do that than you are making a tradeoff between the list growing and the allocation per iteration. The GC costs can quickly become higher. If the suggestion of limiting the concurrency that I made are taken into account the list can have a fixed size and never need to grow plus you don't create Gen0 garbage
Let me know if I can help somehow when we start working on this for v7. |
Will do :) |
I don't think the correct approach is to use Task.WhenAll. This will simply batch together tasks that are queued, wait for them all to finish, and batch up the next lot. I think the ideal behavior would be to execute the tasks as they happen, up to some specified concurrency limit. I've been working on the same problem attempting to change the behavior so async work is executed in parallel. I've tried two approaches:
If anyone is interested, here is a rough implementation for solution 2: public class ParallelBasicDeliverEventHandler : IDisposable
{
private readonly SemaphoreSlim _semaphore;
public event AsyncEventHandler<BasicDeliverEventArgs> Received;
public ParallelBasicDeliverEventHandler(int maxConcurrent)
{
_semaphore = new SemaphoreSlim(maxConcurrent);
}
public Task Handle(object sender, BasicDeliverEventArgs args)
{
// copy memory so it's available for later
args.Body = args.Body.ToArray();
#pragma warning disable CS4014
// lack of "await" intentional
HandleInternal(sender, args);
#pragma warning restore CS4014
return Task.CompletedTask;
}
private async Task HandleInternal(object sender, BasicDeliverEventArgs args)
{
try
{
await _semaphore.WaitAsync();
try
{
if (Received != null)
{
await Received(sender, args);
}
}
finally
{
_semaphore.Release();
}
}
catch
{
// suppress
}
}
public void Dispose()
{
_semaphore.Dispose();
}
} And to use: // how many messages we want to process in parallel (in this case, 10)
const int concurrency = 10;
// configure prefetch so server sends us multiple messages
model.BasicQos(0, concurrency, false);
// wire up consumer
var consumer = new AsyncEventingBasicConsumer(model);
var handler = new ParallelBasicDeliverEventHandler(concurrency);
consumer.Received += handler.Handle;
handler.Received += async (sender, args) =>
{
Console.WriteLine($"Received {args.DeliveryTag}");
await Task.Delay(1000);
var consumer = (AsyncEventingBasicConsumer)sender;
consumer.Model.BasicAck(args.DeliveryTag, false);
};
// begin consuming
model.BasicConsume("test", false, consumer); Hope this helps. |
That is exactly what I have been suggesting instead. |
I skimmed through the issue history too quickly! I agree with your points. Ordering shouldn't matter on client side. I don't understand how you could guarantee ordering and execute in parallel at the same time. It would be much safer explicitly limiting parallelism. While unbounded parallelism might be safe if eg you didn't autoAck and your prefetch was sane, I don't think it's worth the risk when limiting it is so straightforward. Regarding the default limit, as we're talking about async, I'm not sure if setting it as the processor count is the best default. It would make sense if the handler was doing mostly CPU intensive work, but I think in reality we're usually waiting around for a response from an external service. That said, it's probably the safest default, as it would cater for those doing CPU intensive work while the rest of us will probably want to tweak it anyway. My only other suggestion for the default would be the prefetch count (or some function of it), as I think people generally increase prefetch if they want to go faster. |
The assumption around the processor count is that by default the IO thread pool is assumed to have a fixed number of threads that corresponds with the concurrency count and never ramps up
https://docs.microsoft.com/en-us/dotnet/api/system.threading.threadpool.setmaxthreads Anyway I think your suggestions are great input to consider once we start implementing this for the next version of the client. |
Ordering does matter as it is what users expect by default. This is why "batching tasks" is an opt-in feature. |
To clarify, my comments were in the context of executing multiple work items in parallel. I do agree that the library should use a FIFO queue when dispatching work (as it does). This will give you ordering if you limit concurrent work to 1. But if >1, we should expect work to completed out of order. |
As an idea/spike #866 |
This PR is superseded by #866. Thanks everyone! |
Proposed Changes
This change batches tasks when running the AsyncConsumerWorkService to increase the throughput and parallelizing task execution.
Types of Changes
What types of changes does your code introduce to this project?
Put an
x
in the boxes that applyChecklist
CONTRIBUTING.md
document