Skip to content

Commit 61b654f

Browse files
Merge pull request #911 from bollhals/cacheFrameHeader
cache the frameHeaderBuffer & rentedArray (cherry picked from commit 6482755)
1 parent 49d60dc commit 61b654f

File tree

3 files changed

+27
-63
lines changed

3 files changed

+27
-63
lines changed

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 24 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -171,12 +171,14 @@ public static Memory<byte> GetHeartbeatFrame()
171171
public readonly FrameType Type;
172172
public readonly int Channel;
173173
public readonly ReadOnlyMemory<byte> Payload;
174+
private readonly byte[] _rentedArray;
174175

175-
private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload)
176+
private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload, byte[] rentedArray)
176177
{
177178
Type = type;
178179
Channel = channel;
179180
Payload = payload;
181+
_rentedArray = rentedArray;
180182
}
181183

182184
private static void ProcessProtocolHeader(Stream reader)
@@ -210,17 +212,12 @@ private static void ProcessProtocolHeader(Stream reader)
210212
}
211213
}
212214

213-
internal static InboundFrame ReadFrom(Stream reader)
215+
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
214216
{
215217
int type = default;
216-
217218
try
218219
{
219220
type = reader.ReadByte();
220-
if (type == -1)
221-
{
222-
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
223-
}
224221
}
225222
catch (IOException ioe)
226223
{
@@ -237,51 +234,51 @@ internal static InboundFrame ReadFrom(Stream reader)
237234
ExceptionDispatchInfo.Capture(ioe.InnerException).Throw();
238235
}
239236

240-
if (type == 'A')
237+
switch (type)
241238
{
242-
// Probably an AMQP protocol header, otherwise meaningless
243-
ProcessProtocolHeader(reader);
239+
case -1:
240+
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
241+
case 'A':
242+
// Probably an AMQP protocol header, otherwise meaningless
243+
ProcessProtocolHeader(reader);
244+
break;
244245
}
245246

246-
Span<byte> headerBytes = stackalloc byte[6];
247-
reader.Read(headerBytes);
248-
int channel = NetworkOrderDeserializer.ReadUInt16(headerBytes);
249-
int payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2)); // FIXME - throw exn on unreasonable value
247+
reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length);
248+
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer));
249+
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4)); // FIXME - throw exn on unreasonable value
250250

251+
const int EndMarkerLength = 1;
251252
// Is returned by InboundFrame.Dispose in Connection.MainLoopIteration
252-
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(payloadSize);
253-
Memory<byte> payload = new Memory<byte>(payloadBytes, 0, payloadSize);
253+
var readSize = payloadSize + EndMarkerLength;
254+
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
254255
int bytesRead = 0;
255256
try
256257
{
257-
while (bytesRead < payloadSize)
258+
while (bytesRead < readSize)
258259
{
259-
bytesRead += reader.Read(payload.Slice(bytesRead, payloadSize - bytesRead));
260+
bytesRead += reader.Read(payloadBytes, bytesRead, readSize - bytesRead);
260261
}
261262
}
262263
catch (Exception)
263264
{
264265
// Early EOF.
265266
ArrayPool<byte>.Shared.Return(payloadBytes);
266-
throw new MalformedFrameException($"Short frame - expected to read {payloadSize} bytes, only got {bytesRead} bytes");
267+
throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes, only got {bytesRead} bytes");
267268
}
268269

269-
int frameEndMarker = reader.ReadByte();
270-
if (frameEndMarker != Constants.FrameEnd)
270+
if (payloadBytes[payloadSize] != Constants.FrameEnd)
271271
{
272272
ArrayPool<byte>.Shared.Return(payloadBytes);
273-
throw new MalformedFrameException($"Bad frame end marker: {frameEndMarker}");
273+
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
274274
}
275275

276-
return new InboundFrame((FrameType)type, channel, payload);
276+
return new InboundFrame((FrameType)type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
277277
}
278278

279279
public void Dispose()
280280
{
281-
if (MemoryMarshal.TryGetArray(Payload, out ArraySegment<byte> segment))
282-
{
283-
ArrayPool<byte>.Shared.Return(segment.Array);
284-
}
281+
ArrayPool<byte>.Shared.Return(_rentedArray);
285282
}
286283

287284
public override string ToString()

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,15 @@ class SocketFrameHandler : IFrameHandler
8383
private readonly ChannelReader<Memory<byte>> _channelReader;
8484
private readonly Task _writerTask;
8585
private readonly object _semaphore = new object();
86+
private readonly byte[] _frameHeaderBuffer;
8687
private bool _closed;
8788

8889
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
8990
Func<AddressFamily, ITcpClient> socketFactory,
9091
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
9192
{
9293
Endpoint = endpoint;
94+
_frameHeaderBuffer = new byte[6];
9395
var channel = Channel.CreateUnbounded<Memory<byte>>(
9496
new UnboundedChannelOptions
9597
{
@@ -224,7 +226,7 @@ public void Close()
224226

225227
public InboundFrame ReadFrame()
226228
{
227-
return RabbitMQ.Client.Impl.InboundFrame.ReadFrom(_reader);
229+
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer);
228230
}
229231

230232
public void SendHeader()

projects/RabbitMQ.Client/util/StreamExtensions.cs

Lines changed: 0 additions & 35 deletions
This file was deleted.

0 commit comments

Comments
 (0)