Skip to content

Commit 1125283

Browse files
Merge pull request #919 from bollhals/forwardBody
reduce the amount of times we rent / return from arraypool
2 parents e98e97c + 9ee839d commit 1125283

21 files changed

+251
-211
lines changed

projects/Apigen/apigen/Apigen.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,10 @@ string GetParameterString(ParameterInfo pi)
14101410
{
14111411
return "cmd.Body";
14121412
}
1413+
else if (Attribute(pi, typeof(AmqpContentBodyArrayMappingAttribute)) != null)
1414+
{
1415+
return "cmd.TakeoverPayload()";
1416+
}
14131417
else
14141418
{
14151419
return $"__impl._{(!(Attribute(pi, typeof(AmqpFieldMappingAttribute)) is AmqpFieldMappingAttribute fieldMapping) ? pi.Name : fieldMapping.m_fieldName)}";

projects/RabbitMQ.Client/client/api/BasicGetResult.cs

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,18 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334

3435
namespace RabbitMQ.Client
3536
{
3637
/// <summary>Represents Basic.GetOk responses from the server.</summary>
3738
/// <remarks>
3839
/// Basic.Get either returns an instance of this class, or null if a Basic.GetEmpty was received.
3940
/// </remarks>
40-
public class BasicGetResult
41+
public sealed class BasicGetResult : IDisposable
4142
{
43+
private readonly byte[] _rentedArray;
44+
4245
/// <summary>
4346
/// Sets the new instance's properties from the arguments passed in.
4447
/// </summary>
@@ -48,9 +51,9 @@ public class BasicGetResult
4851
/// <param name="routingKey">Routing key with which the message was published.</param>
4952
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
5053
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
51-
/// <param name="body"></param>
52-
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
53-
string routingKey, uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
54+
/// <param name="body">The body</param>
55+
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
56+
uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body)
5457
{
5558
DeliveryTag = deliveryTag;
5659
Redelivered = redelivered;
@@ -61,6 +64,30 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
6164
Body = body;
6265
}
6366

67+
/// <summary>
68+
/// Sets the new instance's properties from the arguments passed in.
69+
/// </summary>
70+
/// <param name="deliveryTag">Delivery tag for the message.</param>
71+
/// <param name="redelivered">Redelivered flag for the message</param>
72+
/// <param name="exchange">The exchange this message was published to.</param>
73+
/// <param name="routingKey">Routing key with which the message was published.</param>
74+
/// <param name="messageCount">The number of messages pending on the queue, excluding the message being delivered.</param>
75+
/// <param name="basicProperties">The Basic-class content header properties for the message.</param>
76+
/// <param name="body">The body</param>
77+
/// <param name="rentedArray">The rented array which body is part of.</param>
78+
public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange, string routingKey,
79+
uint messageCount, IBasicProperties basicProperties, ReadOnlyMemory<byte> body, byte[] rentedArray)
80+
{
81+
DeliveryTag = deliveryTag;
82+
Redelivered = redelivered;
83+
Exchange = exchange;
84+
RoutingKey = routingKey;
85+
MessageCount = messageCount;
86+
BasicProperties = basicProperties;
87+
Body = body;
88+
_rentedArray = rentedArray;
89+
}
90+
6491
/// <summary>
6592
/// Retrieves the Basic-class content header properties for this message.
6693
/// </summary>
@@ -99,5 +126,14 @@ public BasicGetResult(ulong deliveryTag, bool redelivered, string exchange,
99126
/// Retrieve the routing key with which this message was published.
100127
/// </summary>
101128
public string RoutingKey { get; private set; }
129+
130+
/// <inheritdoc />
131+
public void Dispose()
132+
{
133+
if (!(_rentedArray is null))
134+
{
135+
ArrayPool<byte>.Shared.Return(_rentedArray);
136+
}
137+
}
102138
}
103139
}

projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,10 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
4545
string exchange,
4646
string routingKey,
4747
IBasicProperties basicProperties,
48-
ReadOnlySpan<byte> body)
48+
ReadOnlyMemory<byte> body,
49+
byte[] rentedArray)
4950
{
50-
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(body.Length);
51-
Memory<byte> bodyCopy = new Memory<byte>(bodyBytes, 0, body.Length);
52-
body.CopyTo(bodyCopy.Span);
53-
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy));
51+
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray));
5452
}
5553

5654
public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)
@@ -69,17 +67,15 @@ public void HandleModelShutdown(IBasicConsumer consumer, ShutdownEventArgs reaso
6967
Schedule(new ModelShutdown(consumer, reason, _model));
7068
}
7169

72-
private void ScheduleUnlessShuttingDown<TWork>(TWork work)
73-
where TWork : Work
70+
private void ScheduleUnlessShuttingDown(Work work)
7471
{
7572
if (!IsShutdown)
7673
{
7774
Schedule(work);
7875
}
7976
}
8077

81-
private void Schedule<TWork>(TWork work)
82-
where TWork : Work
78+
private void Schedule(Work work)
8379
{
8480
_workService.Schedule(_model, work);
8581
}

projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ public AsyncConsumerWorkService(int concurrency) : base(concurrency)
1818
_startNewWorkPoolFunc = model => StartNewWorkPool(model);
1919
}
2020

21-
public void Schedule<TWork>(IModel model, TWork work) where TWork : Work
21+
public void Schedule(IModel model, Work work)
2222
{
2323
/*
2424
* rabbitmq/rabbitmq-dotnet-client#841

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -564,15 +564,15 @@ public void HandleBasicDeliver(string consumerTag,
564564
string exchange,
565565
string routingKey,
566566
IBasicProperties basicProperties,
567-
ReadOnlyMemory<byte> body)
567+
ReadOnlyMemory<byte> body,
568+
byte[] rentedArray)
568569
{
569570
if (_disposed)
570571
{
571572
throw new ObjectDisposedException(GetType().FullName);
572573
}
573574

574-
_delegate.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange,
575-
routingKey, basicProperties, body);
575+
_delegate.HandleBasicDeliver(consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, body, rentedArray);
576576
}
577577

578578
public void HandleBasicGetEmpty()
@@ -591,15 +591,15 @@ public void HandleBasicGetOk(ulong deliveryTag,
591591
string routingKey,
592592
uint messageCount,
593593
IBasicProperties basicProperties,
594-
ReadOnlyMemory<byte> body)
594+
ReadOnlyMemory<byte> body,
595+
byte[] rentedArray)
595596
{
596597
if (_disposed)
597598
{
598599
throw new ObjectDisposedException(GetType().FullName);
599600
}
600601

601-
_delegate.HandleBasicGetOk(deliveryTag, redelivered, exchange, routingKey,
602-
messageCount, basicProperties, body);
602+
_delegate.HandleBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount, basicProperties, body, rentedArray);
603603
}
604604

605605
public void HandleBasicNack(ulong deliveryTag,
@@ -629,15 +629,15 @@ public void HandleBasicReturn(ushort replyCode,
629629
string exchange,
630630
string routingKey,
631631
IBasicProperties basicProperties,
632-
ReadOnlyMemory<byte> body)
632+
ReadOnlyMemory<byte> body,
633+
byte[] rentedArray)
633634
{
634635
if (_disposed)
635636
{
636637
throw new ObjectDisposedException(GetType().FullName);
637638
}
638639

639-
_delegate.HandleBasicReturn(replyCode, replyText, exchange,
640-
routingKey, basicProperties, body);
640+
_delegate.HandleBasicReturn(replyCode, replyText, exchange, routingKey, basicProperties, body, rentedArray);
641641
}
642642

643643
public void HandleChannelClose(ushort replyCode,

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System;
22
using System.Buffers;
3-
using System.Runtime.InteropServices;
43
using System.Threading.Tasks;
54

65
namespace RabbitMQ.Client.Impl
@@ -14,6 +13,7 @@ internal sealed class BasicDeliver : Work
1413
private readonly string _routingKey;
1514
private readonly IBasicProperties _basicProperties;
1615
private readonly ReadOnlyMemory<byte> _body;
16+
private readonly byte[] _rentedBytes;
1717

1818
public override string Context => "HandleBasicDeliver";
1919

@@ -24,7 +24,8 @@ public BasicDeliver(IBasicConsumer consumer,
2424
string exchange,
2525
string routingKey,
2626
IBasicProperties basicProperties,
27-
ReadOnlyMemory<byte> body) : base(consumer)
27+
ReadOnlyMemory<byte> body,
28+
byte[] rentedBytes) : base(consumer)
2829
{
2930
_consumerTag = consumerTag;
3031
_deliveryTag = deliveryTag;
@@ -33,6 +34,7 @@ public BasicDeliver(IBasicConsumer consumer,
3334
_routingKey = routingKey;
3435
_basicProperties = basicProperties;
3536
_body = body;
37+
_rentedBytes = rentedBytes;
3638
}
3739

3840
protected override Task Execute(IAsyncBasicConsumer consumer)
@@ -48,10 +50,7 @@ protected override Task Execute(IAsyncBasicConsumer consumer)
4850

4951
public override void PostExecute()
5052
{
51-
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
52-
{
53-
ArrayPool<byte>.Shared.Return(segment.Array);
54-
}
53+
ArrayPool<byte>.Shared.Return(_rentedBytes);
5554
}
5655
}
5756
}

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal sealed class CommandAssembler
4646
private MethodBase _method;
4747
private ContentHeaderBase _header;
4848
private byte[] _bodyBytes;
49-
private Memory<byte> _body;
49+
private ReadOnlyMemory<byte> _body;
5050
private int _remainingBodyBytes;
5151
private int _offset;
5252
private AssemblyState _state;
@@ -62,14 +62,15 @@ private void Reset()
6262
_method = null;
6363
_header = null;
6464
_bodyBytes = null;
65-
_body = Memory<byte>.Empty;
65+
_body = ReadOnlyMemory<byte>.Empty;
6666
_remainingBodyBytes = 0;
6767
_offset = 0;
6868
_state = AssemblyState.ExpectingMethod;
6969
}
7070

71-
public IncomingCommand HandleFrame(in InboundFrame frame)
71+
public bool HandleFrame(in InboundFrame frame, out IncomingCommand command)
7272
{
73+
bool shallReturn = true;
7374
switch (_state)
7475
{
7576
case AssemblyState.ExpectingMethod:
@@ -79,18 +80,19 @@ public IncomingCommand HandleFrame(in InboundFrame frame)
7980
ParseHeaderFrame(in frame);
8081
break;
8182
case AssemblyState.ExpectingContentBody:
82-
ParseBodyFrame(in frame);
83+
shallReturn = ParseBodyFrame(in frame);
8384
break;
8485
}
8586

8687
if (_state != AssemblyState.Complete)
8788
{
88-
return IncomingCommand.Empty;
89+
command = IncomingCommand.Empty;
90+
return true;
8991
}
9092

91-
var result = new IncomingCommand(_method, _header, _body, _bodyBytes);
93+
command = new IncomingCommand(_method, _header, _body, _bodyBytes);
9294
Reset();
93-
return result;
95+
return shallReturn;
9496
}
9597

9698
private void ParseMethodFrame(in InboundFrame frame)
@@ -121,14 +123,10 @@ private void ParseHeaderFrame(in InboundFrame frame)
121123
}
122124

123125
_remainingBodyBytes = (int) totalBodyBytes;
124-
125-
// Is returned by IncomingCommand.Dispose in Session.HandleFrame
126-
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
127-
_body = new Memory<byte>(_bodyBytes, 0, _remainingBodyBytes);
128126
UpdateContentBodyState();
129127
}
130128

131-
private void ParseBodyFrame(in InboundFrame frame)
129+
private bool ParseBodyFrame(in InboundFrame frame)
132130
{
133131
if (frame.Type != FrameType.FrameBody)
134132
{
@@ -141,10 +139,27 @@ private void ParseBodyFrame(in InboundFrame frame)
141139
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyBytes} bytes remaining, {payloadLength} bytes received");
142140
}
143141

144-
frame.Payload.CopyTo(_body.Slice(_offset));
142+
if (_bodyBytes is null)
143+
{
144+
// check for single frame payload for an early exit
145+
if (payloadLength == _remainingBodyBytes)
146+
{
147+
_bodyBytes = frame.TakeoverPayload();
148+
_body = frame.Payload;
149+
_state = AssemblyState.Complete;
150+
return false;
151+
}
152+
153+
// Is returned by IncomingCommand.ReturnPayload in Session.HandleFrame
154+
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
155+
_body = new ReadOnlyMemory<byte>(_bodyBytes, 0, _remainingBodyBytes);
156+
}
157+
158+
frame.Payload.Span.CopyTo(_bodyBytes.AsSpan(_offset));
145159
_remainingBodyBytes -= payloadLength;
146160
_offset += payloadLength;
147161
UpdateContentBodyState();
162+
return true;
148163
}
149164

150165
private void UpdateContentBodyState()

0 commit comments

Comments
 (0)