Skip to content

reduce the amount of times we rent / return from arraypool #919

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions projects/Apigen/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,10 @@ string GetParameterString(ParameterInfo pi)
{
return "cmd.Body";
}
else if (Attribute(pi, typeof(AmqpContentBodyArrayMappingAttribute)) != null)
{
return "cmd.TakeoverPayload()";
}
else
{
return $"__impl._{(!(Attribute(pi, typeof(AmqpFieldMappingAttribute)) is AmqpFieldMappingAttribute fieldMapping) ? pi.Name : fieldMapping.m_fieldName)}";
Expand Down
44 changes: 40 additions & 4 deletions projects/RabbitMQ.Client/client/api/BasicGetResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,18 @@
//---------------------------------------------------------------------------

using System;
using System.Buffers;

namespace RabbitMQ.Client
{
/// <summary>Represents Basic.GetOk responses from the server.</summary>
/// <remarks>
/// Basic.Get either returns an instance of this class, or null if a Basic.GetEmpty was received.
/// </remarks>
public class BasicGetResult
public sealed class BasicGetResult : IDisposable
{
private readonly byte[] _rentedArray;

/// <summary>
/// Sets the new instance's properties from the arguments passed in.
/// </summary>
Expand All @@ -48,9 +51,9 @@ public class BasicGetResult
/// <param name="routingKey">Routing key with which the message was published.</param>
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body"></param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
string routingKey, uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
/// <param name="body">The body</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Expand All @@ -61,6 +64,30 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
Body = body;
}

/// <summary>
/// Sets the new instance's properties from the arguments passed in.
/// </summary>
/// <param name="deliveryTag">Delivery tag for the message.</param>
/// <param name="redelivered">Redelivered flag for the message</param>
/// <param name="exchange">The exchange this message was published to.</param>
/// <param name="routingKey">Routing key with which the message was published.</param>
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
/// <param name="body">The body</param>
/// <param name="rentedArray">The rented array which body is part of.</param>
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
{
DeliveryTag = deliveryTag;
Redelivered = redelivered;
Exchange = exchange;
RoutingKey = routingKey;
MessageCount = messageCount;
BasicProperties = basicProperties;
Body = body;
_rentedArray = rentedArray;
}

/// <summary>
/// Retrieves the Basic-class content header properties for this message.
/// </summary>
Expand Down Expand Up @@ -99,5 +126,14 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
/// Retrieve the routing key with which this message was published.
/// </summary>
public string RoutingKey { get; private set; }

/// <inheritdoc />
public void Dispose()
{
if (!(_rentedArray is null))
{
ArrayPool<byte>.Shared.Return(_rentedArray);
}
}
}
}
14 changes: 5 additions & 9 deletions projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,10 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlySpan<byte> body)
ReadOnlyMemory<byte> body,
byte[] rentedArray)
{
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(body.Length);
Memory<byte> bodyCopy = new Memory<byte>(bodyBytes, 0, body.Length);
body.CopyTo(bodyCopy.Span);
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy));
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
}

public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
Expand All @@ -69,17 +67,15 @@ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reaso
Schedule(new ModelShutdown(consumer, reason, _model));
}

private void ScheduleUnlessShuttingDown<TWork>(TWork work)
where TWork : Work
private void ScheduleUnlessShuttingDown(Work work)
{
if (!IsShutdown)
{
Schedule(work);
}
}

private void Schedule<TWork>(TWork work)
where TWork : Work
private void Schedule(Work work)
{
_workService.Schedule(_model, work);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public AsyncConsumerWorkService(int concurrency) : base(concurrency)
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
}

public void Schedule<TWork>(IModel model, TWork work) where TWork : Work
public void Schedule(IModel model, Work work)
{
/*
* rabbitmq/rabbitmq-dotnet-client#841
Expand Down
18 changes: 9 additions & 9 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,15 +564,15 @@ public void HandleBasicDeliver(string consumerTag,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
ReadOnlyMemory<byte> body,
byte[] rentedArray)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

_delegate.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange,
routingKey, basicProperties, body);
_delegate.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray);
}

public void HandleBasicGetEmpty()
Expand All @@ -591,15 +591,15 @@ public void HandleBasicGetOk(ulong deliveryTag,
string routingKey,
uint messageCount,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
ReadOnlyMemory<byte> body,
byte[] rentedArray)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

_delegate.HandleBasicGetOk(deliveryTag, redelivered, exchange, routingKey,
messageCount, basicProperties, body);
_delegate.HandleBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount, basicProperties, body, rentedArray);
}

public void HandleBasicNack(ulong deliveryTag,
Expand Down Expand Up @@ -629,15 +629,15 @@ public void HandleBasicReturn(ushort replyCode,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body)
ReadOnlyMemory<byte> body,
byte[] rentedArray)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

_delegate.HandleBasicReturn(replyCode, replyText, exchange,
routingKey, basicProperties, body);
_delegate.HandleBasicReturn(replyCode, replyText, exchange, routingKey, basicProperties, body, rentedArray);
}

public void HandleChannelClose(ushort replyCode,
Expand Down
11 changes: 5 additions & 6 deletions projects/RabbitMQ.Client/client/impl/BasicDeliver.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
using System;
using System.Buffers;
using System.Runtime.InteropServices;
using System.Threading.Tasks;

namespace RabbitMQ.Client.Impl
Expand All @@ -14,6 +13,7 @@ internal sealed class BasicDeliver : Work
private readonly string _routingKey;
private readonly IBasicProperties _basicProperties;
private readonly ReadOnlyMemory<byte> _body;
private readonly byte[] _rentedBytes;

public override string Context => "HandleBasicDeliver";

Expand All @@ -24,7 +24,8 @@ public BasicDeliver(IBasicConsumer consumer,
string exchange,
string routingKey,
IBasicProperties basicProperties,
ReadOnlyMemory<byte> body) : base(consumer)
ReadOnlyMemory<byte> body,
byte[] rentedBytes) : base(consumer)
{
_consumerTag = consumerTag;
_deliveryTag = deliveryTag;
Expand All @@ -33,6 +34,7 @@ public BasicDeliver(IBasicConsumer consumer,
_routingKey = routingKey;
_basicProperties = basicProperties;
_body = body;
_rentedBytes = rentedBytes;
}

protected override Task Execute(IAsyncBasicConsumer consumer)
Expand All @@ -48,10 +50,7 @@ protected override Task Execute(IAsyncBasicConsumer consumer)

public override void PostExecute()
{
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
{
ArrayPool<byte>.Shared.Return(segment.Array);
}
ArrayPool<byte>.Shared.Return(_rentedBytes);
}
}
}
41 changes: 28 additions & 13 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ internal sealed class CommandAssembler
private MethodBase _method;
private ContentHeaderBase _header;
private byte[] _bodyBytes;
private Memory<byte> _body;
private ReadOnlyMemory<byte> _body;
private int _remainingBodyBytes;
private int _offset;
private AssemblyState _state;
Expand All @@ -62,14 +62,15 @@ private void Reset()
_method = null;
_header = null;
_bodyBytes = null;
_body = Memory<byte>.Empty;
_body = ReadOnlyMemory<byte>.Empty;
_remainingBodyBytes = 0;
_offset = 0;
_state = AssemblyState.ExpectingMethod;
}

public IncomingCommand HandleFrame(in InboundFrame frame)
public bool HandleFrame(in InboundFrame frame, out IncomingCommand command)
{
bool shallReturn = true;
switch (_state)
{
case AssemblyState.ExpectingMethod:
Expand All @@ -79,18 +80,19 @@ public IncomingCommand HandleFrame(in InboundFrame frame)
ParseHeaderFrame(in frame);
break;
case AssemblyState.ExpectingContentBody:
ParseBodyFrame(in frame);
shallReturn = ParseBodyFrame(in frame);
break;
}

if (_state != AssemblyState.Complete)
{
return IncomingCommand.Empty;
command = IncomingCommand.Empty;
return true;
}

var result = new IncomingCommand(_method, _header, _body, _bodyBytes);
command = new IncomingCommand(_method, _header, _body, _bodyBytes);
Reset();
return result;
return shallReturn;
}

private void ParseMethodFrame(in InboundFrame frame)
Expand Down Expand Up @@ -121,14 +123,10 @@ private void ParseHeaderFrame(in InboundFrame frame)
}

_remainingBodyBytes = (int) totalBodyBytes;

// Is returned by IncomingCommand.Dispose in Session.HandleFrame
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
_body = new Memory<byte>(_bodyBytes, 0, _remainingBodyBytes);
UpdateContentBodyState();
}

private void ParseBodyFrame(in InboundFrame frame)
private bool ParseBodyFrame(in InboundFrame frame)
{
if (frame.Type != FrameType.FrameBody)
{
Expand All @@ -141,10 +139,27 @@ private void ParseBodyFrame(in InboundFrame frame)
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyBytes} bytes remaining, {payloadLength} bytes received");
}

frame.Payload.CopyTo(_body.Slice(_offset));
if (_bodyBytes is null)
{
// check for single frame payload for an early exit
if (payloadLength == _remainingBodyBytes)
{
_bodyBytes = frame.TakeoverPayload();
_body = frame.Payload;
_state = AssemblyState.Complete;
return false;
}

// Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
_body = new ReadOnlyMemory<byte>(_bodyBytes, 0, _remainingBodyBytes);
}

frame.Payload.Span.CopyTo(_bodyBytes.AsSpan(_offset));
_remainingBodyBytes -= payloadLength;
_offset += payloadLength;
UpdateContentBodyState();
return true;
}

private void UpdateContentBodyState()
Expand Down
Loading