-
Notifications
You must be signed in to change notification settings - Fork 605
Pipelines implementation and allocation improvements #706
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
Show all changes
14 commits
Select commit
Hold shift + click to select a range
a64bd77
First attempt at reducing allocs.
stebet aea5316
Adding ArrayPool and RecyclableMemoryStream.
1ecff67
Adding Pipeline support.
2d665c3
Fixing pipeline.
08dacfc
More fixes.
eba87ae
Allocations pretty much constant now, as we are writing directly to t…
stebet 5e40529
Fixing bug in the writer.
stebet 18e152e
All tests except ApiApproval now run. Allocations severely reduced.
df6503f
Merging with master.
03ad67d
Cleaning up code to be inline with master branch.
78784ed
Removing additional debug symbols.
34fb4bc
Improving the connection logic.
f397358
Optimizing the multiple-confirm logic, pulling it out into it's own t…
stebet b4d63bc
Cleaning up code.
stebet File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
using System; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
|
||
using RabbitMQ.Client; | ||
using RabbitMQ.Client.Events; | ||
|
||
namespace DeadlockRabbitMQ | ||
{ | ||
class Program | ||
{ | ||
private static int messagesSent = 0; | ||
private static int messagesReceived = 0; | ||
private static int batchesToSend = 100; | ||
private static int itemsPerBatch = 500; | ||
static async Task Main(string[] args) | ||
{ | ||
Console.ReadLine(); | ||
var connectionString = new Uri("amqp://guest:guest@localhost/"); | ||
|
||
var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString }; | ||
var connection = connectionFactory.CreateConnection(); | ||
var connection2 = connectionFactory.CreateConnection(); | ||
var publisher = connection.CreateModel(); | ||
var subscriber = connection2.CreateModel(); | ||
publisher.ConfirmSelect(); | ||
//subscriber.ConfirmSelect(); | ||
|
||
publisher.ExchangeDeclare("test", ExchangeType.Topic, true); | ||
|
||
subscriber.QueueDeclare("testqueue", true, false, true); | ||
var asyncListener = new AsyncEventingBasicConsumer(subscriber); | ||
asyncListener.Received += AsyncListener_Received; | ||
subscriber.QueueBind("testqueue", "test", "myawesome.routing.key"); | ||
subscriber.BasicConsume("testqueue", false, "testconsumer", asyncListener); | ||
|
||
byte[] payload = new byte[16384]; | ||
var batchPublish = Task.Run(async () => | ||
{ | ||
while (messagesSent < batchesToSend * itemsPerBatch) | ||
{ | ||
var batch = publisher.CreateBasicPublishBatch(); | ||
for (int i = 0; i < itemsPerBatch; i++) | ||
{ | ||
var properties = publisher.CreateBasicProperties(); | ||
properties.AppId = "testapp"; | ||
properties.CorrelationId = Guid.NewGuid().ToString(); | ||
batch.Add("test", "myawesome.routing.key", false, properties, payload); | ||
} | ||
batch.Publish(); | ||
messagesSent += itemsPerBatch; | ||
publisher.WaitForConfirmsOrDie(); | ||
} | ||
}); | ||
|
||
var sentTask = Task.Run(async () => | ||
{ | ||
while (messagesSent < batchesToSend * itemsPerBatch) | ||
{ | ||
Console.WriteLine($"Messages sent: {messagesSent}"); | ||
|
||
await Task.Delay(500); | ||
} | ||
|
||
Console.WriteLine("Done sending messages!"); | ||
}); | ||
|
||
var receivedTask = Task.Run(async () => | ||
{ | ||
while (messagesReceived < batchesToSend * itemsPerBatch) | ||
{ | ||
Console.WriteLine($"Messages received: {messagesReceived}"); | ||
|
||
await Task.Delay(500); | ||
} | ||
|
||
Console.WriteLine("Done receiving all messages."); | ||
}); | ||
|
||
await Task.WhenAll(sentTask, receivedTask); | ||
Console.ReadLine(); | ||
} | ||
|
||
private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event) | ||
{ | ||
// Doing things in parallel here is what will eventually trigger the deadlock, | ||
// probably due to a race condition in AsyncConsumerWorkService.Loop, although | ||
// I've had trouble pinpointing it exactly, but due to how the code in there uses | ||
// a TaskCompletionSource, and elsewhere overrides it, it might cause Enqueue and Loop | ||
// to eventually be working with different references, or that's at least the current theory. | ||
// Moving to better synchronization constructs solves the issue, and using the ThreadPool | ||
// is standard practice as well to maximize core utilization and reduce overhead of Thread creation | ||
Interlocked.Increment(ref messagesReceived); | ||
(sender as AsyncDefaultBasicConsumer).Model.BasicAck(@event.DeliveryTag, true); | ||
return Task.CompletedTask; | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
<Project Sdk="Microsoft.NET.Sdk"> | ||
|
||
<PropertyGroup> | ||
<OutputType>Exe</OutputType> | ||
<TargetFramework>netcoreapp3.1</TargetFramework> | ||
<DebugType>full</DebugType> | ||
</PropertyGroup> | ||
|
||
<ItemGroup> | ||
<ProjectReference Include="..\projects\client\RabbitMQ.Client\RabbitMQ.Client.csproj" /> | ||
</ItemGroup> | ||
|
||
</Project> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
53 changes: 53 additions & 0 deletions
53
projects/client/RabbitMQ.Client/src/client/impl/AsyncRpcContinuation.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,53 @@ | ||
using System; | ||
using System.Collections.Generic; | ||
using System.Linq; | ||
using System.Text; | ||
using System.Threading; | ||
using System.Threading.Tasks; | ||
using RabbitMQ.Client.Exceptions; | ||
|
||
namespace RabbitMQ.Client.Impl | ||
{ | ||
public class AsyncRpcContinuation : IRpcContinuation | ||
{ | ||
private readonly TaskCompletionSource<Command> _taskCompletionSource = new TaskCompletionSource<Command>(TaskCreationOptions.RunContinuationsAsynchronously); | ||
|
||
public virtual async ValueTask<Command> GetReplyAsync(CancellationToken cancellationToken = default) | ||
{ | ||
try | ||
{ | ||
if (cancellationToken != default) | ||
{ | ||
using (cancellationToken.Register(() => _taskCompletionSource.TrySetCanceled(cancellationToken))) | ||
{ | ||
return await _taskCompletionSource.Task.ConfigureAwait(false); | ||
} | ||
} | ||
|
||
return await _taskCompletionSource.Task.ConfigureAwait(false); | ||
} | ||
catch (OperationInterruptedException) | ||
{ | ||
throw; | ||
} | ||
} | ||
|
||
public virtual async ValueTask<Command> GetReplyAsync(TimeSpan timeout) | ||
{ | ||
using (CancellationTokenSource cts = new CancellationTokenSource(timeout)) | ||
{ | ||
return await GetReplyAsync(cts.Token).ConfigureAwait(false); | ||
} | ||
} | ||
|
||
public void HandleCommand(Command cmd) | ||
{ | ||
_taskCompletionSource.TrySetResult(cmd); | ||
} | ||
|
||
public void HandleModelShutdown(ShutdownEventArgs reason) | ||
{ | ||
_taskCompletionSource.TrySetException(new OperationInterruptedException(reason)); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should not be conditioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's part of the "rebase on
master
" work 😄There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, but there's no reason to include it for netstandard builds either as it's a noop there anyway. It's simpler to skip the condition though :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW with the condition the build fails in my Linux and OS X environment 🤷♂