Skip to content

Pipelines implementation and allocation improvements #706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions RabbitMQDotNetClient.sln
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@ EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{34486CC0-D61E-46BA-9E5E-6E8EFA7C34B5}"
ProjectSection(SolutionItems) = preProject
.editorconfig = .editorconfig
docs\specs\amqp0-9-1.xml = docs\specs\amqp0-9-1.xml
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQTest", "RabbitMQTest\RabbitMQTest.csproj", "{3CFAC019-8281-48AD-8925-16CE8EC3CE50}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -104,6 +107,30 @@ Global
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x64.Build.0 = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{B416DDB7-5E3E-4A20-B5A9-C6E518E203A2}.SignedRelease|x86.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x64.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x64.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x86.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Debug|x86.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|Any CPU.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|Any CPU.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x64.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x64.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x86.ActiveCfg = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.DebugNoTest|x86.Build.0 = Debug|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|Any CPU.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x64.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x64.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x86.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.Release|x86.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|Any CPU.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|Any CPU.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x64.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x64.Build.0 = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x86.ActiveCfg = Release|Any CPU
{3CFAC019-8281-48AD-8925-16CE8EC3CE50}.SignedRelease|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
98 changes: 98 additions & 0 deletions RabbitMQTest/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
using System;
using System.Threading;
using System.Threading.Tasks;

using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace DeadlockRabbitMQ
{
class Program
{
private static int messagesSent = 0;
private static int messagesReceived = 0;
private static int batchesToSend = 100;
private static int itemsPerBatch = 500;
static async Task Main(string[] args)
{
Console.ReadLine();
var connectionString = new Uri("amqp://guest:guest@localhost/");

var connectionFactory = new ConnectionFactory() { DispatchConsumersAsync = true, Uri = connectionString };
var connection = connectionFactory.CreateConnection();
var connection2 = connectionFactory.CreateConnection();
var publisher = connection.CreateModel();
var subscriber = connection2.CreateModel();
publisher.ConfirmSelect();
//subscriber.ConfirmSelect();

publisher.ExchangeDeclare("test", ExchangeType.Topic, true);

subscriber.QueueDeclare("testqueue", true, false, true);
var asyncListener = new AsyncEventingBasicConsumer(subscriber);
asyncListener.Received += AsyncListener_Received;
subscriber.QueueBind("testqueue", "test", "myawesome.routing.key");
subscriber.BasicConsume("testqueue", false, "testconsumer", asyncListener);

byte[] payload = new byte[16384];
var batchPublish = Task.Run(async () =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
var batch = publisher.CreateBasicPublishBatch();
for (int i = 0; i < itemsPerBatch; i++)
{
var properties = publisher.CreateBasicProperties();
properties.AppId = "testapp";
properties.CorrelationId = Guid.NewGuid().ToString();
batch.Add("test", "myawesome.routing.key", false, properties, payload);
}
batch.Publish();
messagesSent += itemsPerBatch;
publisher.WaitForConfirmsOrDie();
}
});

var sentTask = Task.Run(async () =>
{
while (messagesSent < batchesToSend * itemsPerBatch)
{
Console.WriteLine($"Messages sent: {messagesSent}");

await Task.Delay(500);
}

Console.WriteLine("Done sending messages!");
});

var receivedTask = Task.Run(async () =>
{
while (messagesReceived < batchesToSend * itemsPerBatch)
{
Console.WriteLine($"Messages received: {messagesReceived}");

await Task.Delay(500);
}

Console.WriteLine("Done receiving all messages.");
});

await Task.WhenAll(sentTask, receivedTask);
Console.ReadLine();
}

private static Task AsyncListener_Received(object sender, BasicDeliverEventArgs @event)
{
// Doing things in parallel here is what will eventually trigger the deadlock,
// probably due to a race condition in AsyncConsumerWorkService.Loop, although
// I've had trouble pinpointing it exactly, but due to how the code in there uses
// a TaskCompletionSource, and elsewhere overrides it, it might cause Enqueue and Loop
// to eventually be working with different references, or that's at least the current theory.
// Moving to better synchronization constructs solves the issue, and using the ThreadPool
// is standard practice as well to maximize core utilization and reduce overhead of Thread creation
Interlocked.Increment(ref messagesReceived);
(sender as AsyncDefaultBasicConsumer).Model.BasicAck(@event.DeliveryTag, true);
return Task.CompletedTask;
}
}
}
13 changes: 13 additions & 0 deletions RabbitMQTest/RabbitMQTest.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DebugType>full</DebugType>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\projects\client\RabbitMQ.Client\RabbitMQ.Client.csproj" />
</ItemGroup>

</Project>
5 changes: 4 additions & 1 deletion projects/client/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" />
<PackageReference Include="Pipelines.Sockets.Unofficial" Version="2.1.1" />
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
<PackageReference Include="Microsoft.IO.RecyclableMemoryStream" Version="1.3.2" />
<PackageReference Include="Microsoft.NETFramework.ReferenceAssemblies" Version="1.0.0" PrivateAssets="All" Condition="$('TargetFramework') == ('net461')" />
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be conditioned.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's part of the "rebase on master" work 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but there's no reason to include it for netstandard builds either as it's a noop there anyway. It's simpler to skip the condition though :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW with the condition the build fails in my Linux and OS X environment 🤷‍♂

</ItemGroup>

<ItemGroup Condition=" '$(Configuration)' == 'SignedRelease' ">
Expand Down
7 changes: 4 additions & 3 deletions projects/client/RabbitMQ.Client/src/client/api/IModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using RabbitMQ.Client.Apigen.Attributes;
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

using RabbitMQ.Client.Apigen.Attributes;
using RabbitMQ.Client.Events;

namespace RabbitMQ.Client
{
Expand Down
5 changes: 3 additions & 2 deletions projects/client/RabbitMQ.Client/src/client/api/ITcpClient.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Threading.Tasks;
using System.IO;
using System.Net.Sockets;
using System.Threading.Tasks;

namespace RabbitMQ.Client
{
Expand All @@ -18,7 +19,7 @@ public interface ITcpClient : IDisposable

Task ConnectAsync(string host, int port);

NetworkStream GetStream();
Stream GetStream();

void Close();
}
Expand Down
4 changes: 1 addition & 3 deletions projects/client/RabbitMQ.Client/src/client/api/SslHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ public static Stream TcpUpgrade(Stream tcpStream, SslOption sslOption)

var sslStream = new SslStream(tcpStream, false, remoteCertValidator, localCertSelector);

sslStream.AuthenticateAsClientAsync(sslOption.ServerName, sslOption.Certs, sslOption.Version,
sslOption.CheckCertificateRevocation).GetAwaiter().GetResult();

sslStream.AuthenticateAsClient(sslOption.ServerName, sslOption.Certs, sslOption.Version, sslOption.CheckCertificateRevocation);
return sslStream;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

using System.Collections.Generic;
using System.IO;

using RabbitMQ.Util;

namespace RabbitMQ.Client.Content
Expand Down Expand Up @@ -73,7 +74,7 @@ public BasicMessageBuilder(IModel model) : this(model, DefaultAccumulatorSize)
public BasicMessageBuilder(IModel model, int initialAccumulatorSize)
{
Properties = model.CreateBasicProperties();
m_accumulator = new MemoryStream(initialAccumulatorSize);
m_accumulator = PooledMemoryStream.GetMemoryStream(initialAccumulatorSize);

string contentType = GetDefaultContentType();
if (contentType != null)
Expand Down Expand Up @@ -182,6 +183,6 @@ public IMessageBuilder RawWrite(byte[] bytes, int offset, int length)
{
BodyStream.Write(bytes, offset, length);
return this;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System.Buffers;
using System.Text;
using RabbitMQ.Util;

Expand Down Expand Up @@ -97,8 +98,11 @@ public static float ReadSingle(NetworkBinaryReader reader)
public static string ReadString(NetworkBinaryReader reader)
{
ushort length = reader.ReadUInt16();
byte[] bytes = reader.ReadBytes(length);
return Encoding.UTF8.GetString(bytes, 0, bytes.Length);
byte[] bytes = ArrayPool<byte>.Shared.Rent(length);
reader.Read(bytes, 0, length);
string returnValue = Encoding.UTF8.GetString(bytes, 0, bytes.Length);
ArrayPool<byte>.Shared.Return(bytes);
return returnValue;
}

public static void Write(NetworkBinaryWriter writer, byte[] source, int offset, int count)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
//---------------------------------------------------------------------------

using System;
using System.Buffers;
using System.IO;
using System.Net;
using System.Text;
Expand Down Expand Up @@ -293,16 +295,25 @@ public static string ReadString(NetworkBinaryReader reader)

public static string ReadUntypedString(NetworkBinaryReader reader)
{
BinaryWriter buffer = NetworkBinaryWriter.TemporaryBinaryWriter(256);
while (true)
byte[] array = null;
try
{
byte b = reader.ReadByte();
if (b == 0)
array = ArrayPool<byte>.Shared.Rent(256);
int index = 0;
while (true)
{
byte[] temporaryContents = NetworkBinaryWriter.TemporaryContents(buffer);
return Encoding.UTF8.GetString(temporaryContents, 0, temporaryContents.Length);
byte b = reader.ReadByte();
if (b == 0)
{
return Encoding.UTF8.GetString(array, 0, index);
}

array[index++] = b;
}
buffer.Write(b);
}
finally
{
ArrayPool<byte>.Shared.Return(array);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;

namespace RabbitMQ.Client.Impl
{
public class AsyncRpcContinuation : IRpcContinuation
{
private readonly TaskCompletionSource<Command> _taskCompletionSource = new TaskCompletionSource<Command>(TaskCreationOptions.RunContinuationsAsynchronously);

public virtual async ValueTask<Command> GetReplyAsync(CancellationToken cancellationToken = default)
{
try
{
if (cancellationToken != default)
{
using (cancellationToken.Register(() => _taskCompletionSource.TrySetCanceled(cancellationToken)))
{
return await _taskCompletionSource.Task.ConfigureAwait(false);
}
}

return await _taskCompletionSource.Task.ConfigureAwait(false);
}
catch (OperationInterruptedException)
{
throw;
}
}

public virtual async ValueTask<Command> GetReplyAsync(TimeSpan timeout)
{
using (CancellationTokenSource cts = new CancellationTokenSource(timeout))
{
return await GetReplyAsync(cts.Token).ConfigureAwait(false);
}
}

public void HandleCommand(Command cmd)
{
_taskCompletionSource.TrySetResult(cmd);
}

public void HandleModelShutdown(ShutdownEventArgs reason)
{
_taskCompletionSource.TrySetException(new OperationInterruptedException(reason));
}
}
}
Loading