Skip to content

Commit 1a0b5b3

Browse files
bollhalsmichaelklishin
authored andcommitted
feedback from pull request
1 parent f15fd77 commit 1a0b5b3

File tree

6 files changed

+43
-15
lines changed

6 files changed

+43
-15
lines changed

projects/RabbitMQ.Client/client/impl/Command.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ internal void Transmit(ushort channelNumber, Connection connection)
7777
{
7878
int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize);
7979
var size = GetMaxSize(maxBodyPayloadBytes);
80+
81+
// Will be returned by SocketFrameWriter.WriteLoop
8082
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(size), 0, size);
8183
var span = memory.Span;
8284

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ public Command HandleFrame(in InboundFrame f)
100100
}
101101

102102
m_remainingBodyBytes = (int)totalBodyBytes;
103+
104+
// Is returned by Command.Dispose in Session.HandleFrame
103105
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
104106
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
105107
UpdateContentBodyState();

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -903,9 +903,7 @@ public void HeartbeatWriteTimerCallback(object state)
903903
{
904904
if (!_closed)
905905
{
906-
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(Client.Impl.Framing.Heartbeat.FrameSize), 0, Client.Impl.Framing.Heartbeat.FrameSize);
907-
Client.Impl.Framing.Heartbeat.Payload.CopyTo(memory.Span);
908-
Write(memory);
906+
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
909907
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
910908
}
911909
}

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,14 +145,24 @@ internal static class Heartbeat
145145
/* Empty frame */
146146
public const int FrameSize = BaseFrameSize;
147147

148-
public static Span<byte> Payload => new byte[]
148+
/// <summary>
149+
/// Compiler trick to directly refer to static data in the assembly, see here: https://github.com/dotnet/roslyn/pull/24621
150+
/// </summary>
151+
private static ReadOnlySpan<byte> Payload => new byte[]
149152
{
150153
Constants.FrameHeartbeat,
151154
0, 0, // channel
152155
0, 0, 0, 0, // payload length
153156
Constants.FrameEnd
154157
};
155158

159+
public static Memory<byte> GetHeartbeatFrame()
160+
{
161+
// Is returned by SocketFrameHandler.WriteLoop
162+
var buffer = ArrayPool<byte>.Shared.Rent(FrameSize);
163+
Payload.CopyTo(buffer);
164+
return new Memory<byte>(buffer, 0, FrameSize);
165+
}
156166
}
157167
}
158168

@@ -254,6 +264,8 @@ internal static InboundFrame ReadFrom(Stream reader)
254264
reader.Read(headerBytes);
255265
int channel = NetworkOrderDeserializer.ReadUInt16(headerBytes);
256266
int payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2)); // FIXME - throw exn on unreasonable value
267+
268+
// Is returned by InboundFrame.Dispose in Connection.MainLoopIteration
257269
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(payloadSize);
258270
Memory<byte> payload = new Memory<byte>(payloadBytes, 0, payloadSize);
259271
int bytesRead = 0;

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void Close()
200200
try
201201
{
202202
_channelWriter.Complete();
203-
_writerTask.Wait();
203+
_writerTask.GetAwaiter().GetResult();
204204
}
205205
catch(Exception)
206206
{

projects/Unit/TestFrameFormatting.cs

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@
3838
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3939
//---------------------------------------------------------------------------
4040

41+
using System;
42+
using System.Buffers;
43+
using System.Runtime.InteropServices;
4144
using NUnit.Framework;
4245
using RabbitMQ.Client.Framing.Impl;
4346

@@ -49,17 +52,28 @@ class TestFrameFormatting : WireFormattingFixture
4952
[Test]
5053
public void HeartbeatFrame()
5154
{
52-
var frameBytes = Impl.Framing.Heartbeat.Payload.ToArray();
55+
var memory = Impl.Framing.Heartbeat.GetHeartbeatFrame();
56+
var frameSpan = memory.Span;
5357

54-
Assert.AreEqual(8, frameBytes.Length);
55-
Assert.AreEqual(Constants.FrameHeartbeat, frameBytes[0]);
56-
Assert.AreEqual(0, frameBytes[1]); // channel
57-
Assert.AreEqual(0, frameBytes[2]); // channel
58-
Assert.AreEqual(0, frameBytes[3]); // payload size
59-
Assert.AreEqual(0, frameBytes[4]); // payload size
60-
Assert.AreEqual(0, frameBytes[5]); // payload size
61-
Assert.AreEqual(0, frameBytes[6]); // payload size
62-
Assert.AreEqual(Constants.FrameEnd, frameBytes[7]);
58+
try
59+
{
60+
Assert.AreEqual(8, frameSpan.Length);
61+
Assert.AreEqual(Constants.FrameHeartbeat, frameSpan[0]);
62+
Assert.AreEqual(0, frameSpan[1]); // channel
63+
Assert.AreEqual(0, frameSpan[2]); // channel
64+
Assert.AreEqual(0, frameSpan[3]); // payload size
65+
Assert.AreEqual(0, frameSpan[4]); // payload size
66+
Assert.AreEqual(0, frameSpan[5]); // payload size
67+
Assert.AreEqual(0, frameSpan[6]); // payload size
68+
Assert.AreEqual(Constants.FrameEnd, frameSpan[7]);
69+
}
70+
finally
71+
{
72+
if (MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> segment))
73+
{
74+
ArrayPool<byte>.Shared.Return(segment.Array);
75+
}
76+
}
6377
}
6478

6579
[Test]

0 commit comments

Comments
 (0)