Skip to content

Commit f823dc7

Browse files
committed
Introduce SerializedFramesContainer, but it does not fix the issue.
1 parent 60e3928 commit f823dc7

File tree

7 files changed

+71
-46
lines changed

7 files changed

+71
-46
lines changed

projects/Benchmarks/WireFormatting/MethodFraming.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public class MethodFramingBasicAck
1919
public ushort Channel { get; set; }
2020

2121
[Benchmark]
22-
public ReadOnlyMemory<byte> BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
22+
internal SerializedFramesContainer BasicAckWrite() => Framing.SerializeToFrames(ref _basicAck, Channel);
2323
}
2424

2525
[Config(typeof(Config))]
@@ -41,13 +41,13 @@ public class MethodFramingBasicPublish
4141
public int FrameMax { get; set; }
4242

4343
[Benchmark]
44-
public ReadOnlyMemory<byte> BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
44+
internal SerializedFramesContainer BasicPublishWriteNonEmpty() => Framing.SerializeToFrames(ref _basicPublish, ref _properties, _body, Channel, FrameMax);
4545

4646
[Benchmark]
47-
public ReadOnlyMemory<byte> BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
47+
internal SerializedFramesContainer BasicPublishWrite() => Framing.SerializeToFrames(ref _basicPublish, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
4848

4949
[Benchmark]
50-
public ReadOnlyMemory<byte> BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
50+
internal SerializedFramesContainer BasicPublishMemoryWrite() => Framing.SerializeToFrames(ref _basicPublishMemory, ref _propertiesEmpty, _bodyEmpty, Channel, FrameMax);
5151
}
5252

5353
[Config(typeof(Config))]
@@ -60,6 +60,6 @@ public class MethodFramingChannelClose
6060
public ushort Channel { get; set; }
6161

6262
[Benchmark]
63-
public ReadOnlyMemory<byte> ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
63+
internal SerializedFramesContainer ChannelCloseWrite() => Framing.SerializeToFrames(ref _channelClose, Channel);
6464
}
6565
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,18 +403,18 @@ internal void OnCallbackException(CallbackExceptionEventArgs args)
403403
_callbackExceptionWrapper.Invoke(this, args);
404404
}
405405

406-
internal void Write(ReadOnlyMemory<byte> memory)
406+
internal void Write(SerializedFramesContainer frames)
407407
{
408-
var task = _frameHandler.WriteAsync(memory);
408+
var task = _frameHandler.WriteAsync(frames);
409409
if (!task.IsCompletedSuccessfully)
410410
{
411411
task.AsTask().GetAwaiter().GetResult();
412412
}
413413
}
414414

415-
internal ValueTask WriteAsync(ReadOnlyMemory<byte> memory)
415+
internal ValueTask WriteAsync(SerializedFramesContainer frames)
416416
{
417-
return _frameHandler.WriteAsync(memory);
417+
return _frameHandler.WriteAsync(frames);
418418
}
419419

420420
public void Dispose()

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,24 @@
4343

4444
namespace RabbitMQ.Client.Impl
4545
{
46+
internal class SerializedFramesContainer
47+
{
48+
private readonly int _size;
49+
private readonly ReadOnlyMemory<byte> _memory;
50+
private readonly byte[] _rentedArray;
51+
52+
internal SerializedFramesContainer(int size, ReadOnlyMemory<byte> memory, byte[] rentedArray)
53+
{
54+
_size = size;
55+
_memory = memory;
56+
_rentedArray = rentedArray;
57+
}
58+
59+
internal int Size => _size;
60+
internal ReadOnlyMemory<byte> Memory => _memory;
61+
internal byte[] RentedArray => _rentedArray;
62+
}
63+
4664
internal static class Framing
4765
{
4866
/* +------------+---------+----------------+---------+------------------+
@@ -139,31 +157,33 @@ internal static class Heartbeat
139157
///</summary>
140158
private static ReadOnlySpan<byte> Payload => new byte[] { Constants.FrameHeartbeat, 0, 0, 0, 0, 0, 0, Constants.FrameEnd };
141159

142-
public static ReadOnlyMemory<byte> GetHeartbeatFrame()
160+
public static SerializedFramesContainer GetHeartbeatFrame()
143161
{
144162
// Is returned by SocketFrameHandler.WriteLoop
145163
byte[] buffer = ClientArrayPool.Rent(FrameSize);
146164
Payload.CopyTo(buffer);
147-
return new ReadOnlyMemory<byte>(buffer, 0, FrameSize);
165+
var mem = new ReadOnlyMemory<byte>(buffer, 0, FrameSize);
166+
return new SerializedFramesContainer(FrameSize, mem, buffer);
148167
}
149168
}
150169

151170
[MethodImpl(MethodImplOptions.AggressiveInlining)]
152-
public static ReadOnlyMemory<byte> SerializeToFrames<T>(ref T method, ushort channelNumber)
171+
public static SerializedFramesContainer SerializeToFrames<T>(ref T method, ushort channelNumber)
153172
where T : struct, IOutgoingAmqpMethod
154173
{
155174
int size = Method.FrameSize + method.GetRequiredBufferSize();
156175

157176
// Will be returned by SocketFrameWriter.WriteLoop
158-
var array = ClientArrayPool.Rent(size);
177+
byte[] array = ClientArrayPool.Rent(size);
159178
int offset = Method.WriteTo(array, channelNumber, ref method);
160179

161180
System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}");
162-
return new ReadOnlyMemory<byte>(array, 0, size);
181+
var mem = new ReadOnlyMemory<byte>(array, 0, size);
182+
return new SerializedFramesContainer(size, mem, array);
163183
}
164184

165185
[MethodImpl(MethodImplOptions.AggressiveInlining)]
166-
public static ReadOnlyMemory<byte> SerializeToFrames<TMethod, THeader>(ref TMethod method, ref THeader header, ReadOnlyMemory<byte> body, ushort channelNumber, int maxBodyPayloadBytes)
186+
public static SerializedFramesContainer SerializeToFrames<TMethod, THeader>(ref TMethod method, ref THeader header, ReadOnlyMemory<byte> body, ushort channelNumber, int maxBodyPayloadBytes)
167187
where TMethod : struct, IOutgoingAmqpMethod
168188
where THeader : IAmqpHeader
169189
{
@@ -173,11 +193,11 @@ public static ReadOnlyMemory<byte> SerializeToFrames<TMethod, THeader>(ref TMeth
173193
BodySegment.FrameSize * GetBodyFrameCount(maxBodyPayloadBytes, remainingBodyBytes) + remainingBodyBytes;
174194

175195
// Will be returned by SocketFrameWriter.WriteLoop
176-
var array = ClientArrayPool.Rent(size);
196+
byte[] array = ClientArrayPool.Rent(size);
177197

178198
int offset = Method.WriteTo(array, channelNumber, ref method);
179199
offset += Header.WriteTo(array.AsSpan(offset), channelNumber, ref header, remainingBodyBytes);
180-
var bodySpan = body.Span;
200+
ReadOnlySpan<byte> bodySpan = body.Span;
181201
while (remainingBodyBytes > 0)
182202
{
183203
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
@@ -186,7 +206,8 @@ public static ReadOnlyMemory<byte> SerializeToFrames<TMethod, THeader>(ref TMeth
186206
}
187207

188208
System.Diagnostics.Debug.Assert(offset == size, $"Serialized to wrong size, expect {size}, offset {offset}");
189-
return new ReadOnlyMemory<byte>(array, 0, size);
209+
var mem = new ReadOnlyMemory<byte>(array, 0, size);
210+
return new SerializedFramesContainer(size, mem, array);
190211
}
191212

192213
[MethodImpl(MethodImplOptions.AggressiveInlining)]

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,6 @@ internal interface IFrameHandler
6767

6868
ValueTask SendHeaderAsync();
6969

70-
ValueTask WriteAsync(ReadOnlyMemory<byte> memory);
70+
ValueTask WriteAsync(SerializedFramesContainer frames);
7171
}
7272
}

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,10 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers;
34-
using System.Diagnostics;
3533
using System.IO;
3634
using System.IO.Pipelines;
3735
using System.Net;
3836
using System.Net.Sockets;
39-
using System.Runtime.InteropServices;
4037
using System.Threading;
4138
using System.Threading.Channels;
4239
using System.Threading.Tasks;
@@ -91,21 +88,22 @@ internal sealed class SocketFrameHandler : IFrameHandler
9188
{
9289
private readonly AmqpTcpEndpoint _amqpTcpEndpoint;
9390
private readonly ITcpClient _socket;
94-
private readonly ChannelWriter<ReadOnlyMemory<byte>> _channelWriter;
95-
private readonly ChannelReader<ReadOnlyMemory<byte>> _channelReader;
96-
private readonly PipeWriter _pipeWriter;
91+
private readonly ChannelWriter<SerializedFramesContainer> _channelWriter;
92+
private readonly ChannelReader<SerializedFramesContainer> _channelReader;
9793
private readonly PipeReader _pipeReader;
94+
private readonly PipeWriter _pipeWriter;
9895
private readonly Task _writerTask;
9996
private readonly object _semaphore = new object();
10097
private bool _closed;
101-
private static ReadOnlySpan<byte> ProtocolHeader => new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P', 0, 0, 9, 1 };
98+
99+
private static ReadOnlyMemory<byte> ProtocolHeader => new byte[] { (byte)'A', (byte)'M', (byte)'Q', (byte)'P', 0, 0, 9, 1 };
102100

103101
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
104102
Func<AddressFamily, ITcpClient> socketFactory,
105103
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
106104
{
107105
_amqpTcpEndpoint = endpoint;
108-
var channel = Channel.CreateBounded<ReadOnlyMemory<byte>>(
106+
var channel = Channel.CreateBounded<SerializedFramesContainer>(
109107
new BoundedChannelOptions(128)
110108
{
111109
AllowSynchronousContinuations = false,
@@ -167,8 +165,8 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
167165
}
168166
}
169167

170-
_pipeWriter = PipeWriter.Create(netstream);
171168
_pipeReader = PipeReader.Create(netstream);
169+
_pipeWriter = PipeWriter.Create(netstream);
172170

173171
WriteTimeout = writeTimeout;
174172
_writerTask = Task.Run(WriteLoop);
@@ -275,22 +273,21 @@ public bool TryReadFrame(out InboundFrame frame)
275273

276274
public async ValueTask SendHeaderAsync()
277275
{
278-
_pipeWriter.Write(ProtocolHeader);
276+
await _pipeWriter.WriteAsync(ProtocolHeader).ConfigureAwait(false);
279277
await _pipeWriter.FlushAsync().ConfigureAwait(false);
280278
}
281279

282-
public async ValueTask WriteAsync(ReadOnlyMemory<byte> memory)
280+
public async ValueTask WriteAsync(SerializedFramesContainer frames)
283281
{
284282
if (_closed)
285283
{
286284
// TODO return memory?
287-
Debug.Assert(MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment));
288-
ClientArrayPool.Return(segment.Array);
285+
ClientArrayPool.Return(frames.RentedArray);
289286
await Task.Yield();
290287
}
291288
else
292289
{
293-
await _channelWriter.WriteAsync(memory).ConfigureAwait(false);
290+
await _channelWriter.WriteAsync(frames).ConfigureAwait(false);
294291
}
295292
}
296293

@@ -300,20 +297,21 @@ private async Task WriteLoop()
300297
{
301298
while (await _channelReader.WaitToReadAsync().ConfigureAwait(false))
302299
{
303-
if (_channelReader.TryRead(out ReadOnlyMemory<byte> memory))
300+
if (_channelReader.TryRead(out SerializedFramesContainer sfc))
304301
{
305302
try
306303
{
307-
await _pipeWriter.WriteAsync(memory).ConfigureAwait(false);
304+
await _pipeWriter.WriteAsync(sfc.Memory).ConfigureAwait(false);
308305
}
309306
finally
310307
{
311-
Debug.Assert(MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment));
312-
RabbitMqClientEventSource.Log.CommandSent(segment.Count);
313-
ClientArrayPool.Return(segment.Array);
308+
byte[] rentedArray = sfc.RentedArray;
309+
RabbitMqClientEventSource.Log.CommandSent(sfc.Size);
310+
ClientArrayPool.Return(rentedArray);
314311
}
315-
await _pipeWriter.FlushAsync().ConfigureAwait(false);
316312
}
313+
314+
await _pipeWriter.FlushAsync().ConfigureAwait(false);
317315
}
318316
}
319317
catch (Exception ex)

projects/RabbitMQ.Client/util/ClientArrayPool.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System;
3233
using System.Buffers;
3334
using System.Runtime.CompilerServices;
3435

@@ -38,6 +39,14 @@ internal static class ClientArrayPool
3839
{
3940
private static readonly bool s_useArrayPool = false;
4041

42+
static ClientArrayPool()
43+
{
44+
if (false == bool.TryParse(Environment.GetEnvironmentVariable("RABBITMQ_CLIENT_USE_ARRAY_POOL"), out s_useArrayPool))
45+
{
46+
s_useArrayPool = false;
47+
}
48+
}
49+
4150
[MethodImpl(MethodImplOptions.AggressiveInlining)]
4251
internal static byte[] Rent(int minimumLength)
4352
{

projects/Test/Unit/TestFrameFormatting.cs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,9 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Runtime.InteropServices;
3433
using RabbitMQ.Client;
3534
using RabbitMQ.Client.Framing.Impl;
35+
using RabbitMQ.Client.Impl;
3636
using Xunit;
3737

3838
namespace Test.Unit
@@ -42,8 +42,8 @@ public class TestFrameFormatting : WireFormattingFixture
4242
[Fact]
4343
public void HeartbeatFrame()
4444
{
45-
ReadOnlyMemory<byte> memory = RabbitMQ.Client.Impl.Framing.Heartbeat.GetHeartbeatFrame();
46-
ReadOnlySpan<byte> frameSpan = memory.Span;
45+
SerializedFramesContainer sfc = Framing.Heartbeat.GetHeartbeatFrame();
46+
ReadOnlySpan<byte> frameSpan = sfc.Memory.Span;
4747

4848
try
4949
{
@@ -59,10 +59,7 @@ public void HeartbeatFrame()
5959
}
6060
finally
6161
{
62-
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
63-
{
64-
ClientArrayPool.Return(segment.Array);
65-
}
62+
ClientArrayPool.Return(sfc.RentedArray);
6663
}
6764
}
6865

0 commit comments

Comments
 (0)