Skip to content

Commit 28082c8

Browse files
committed
Adding extension method for Stream to support reading Span<byte> allowing us to use "stackalloc".
1 parent 619e5e7 commit 28082c8

File tree

2 files changed

+38
-31
lines changed

2 files changed

+38
-31
lines changed

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -251,44 +251,35 @@ internal static InboundFrame ReadFrom(Stream reader)
251251
ProcessProtocolHeader(reader);
252252
}
253253

254-
byte[] headerBytes = null;
254+
Span<byte> headerBytes = stackalloc byte[6];
255+
reader.Read(headerBytes);
256+
int channel = NetworkOrderDeserializer.ReadUInt16(headerBytes);
257+
int payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2)); // FIXME - throw exn on unreasonable value
258+
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(payloadSize);
259+
Memory<byte> payload = new Memory<byte>(payloadBytes, 0, payloadSize);
260+
int bytesRead = 0;
255261
try
256262
{
257-
headerBytes = ArrayPool<byte>.Shared.Rent(6);
258-
Memory<byte> headerSlice = new Memory<byte>(headerBytes, 0, 6);
259-
reader.Read(headerSlice);
260-
int channel = NetworkOrderDeserializer.ReadUInt16(headerSlice);
261-
int payloadSize = NetworkOrderDeserializer.ReadInt32(headerSlice.Slice(2)); // FIXME - throw exn on unreasonable value
262-
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(payloadSize);
263-
Memory<byte> payload = new Memory<byte>(payloadBytes, 0, payloadSize);
264-
int bytesRead = 0;
265-
try
263+
while (bytesRead < payloadSize)
266264
{
267-
while (bytesRead < payloadSize)
268-
{
269-
bytesRead += reader.Read(payload.Slice(bytesRead, payloadSize - bytesRead));
270-
}
265+
bytesRead += reader.Read(payload.Slice(bytesRead, payloadSize - bytesRead));
271266
}
272-
catch (Exception)
273-
{
274-
// Early EOF.
275-
ArrayPool<byte>.Shared.Return(payloadBytes);
276-
throw new MalformedFrameException($"Short frame - expected to read {payloadSize} bytes, only got {bytesRead} bytes");
277-
}
278-
279-
int frameEndMarker = reader.ReadByte();
280-
if (frameEndMarker != Constants.FrameEnd)
281-
{
282-
ArrayPool<byte>.Shared.Return(payloadBytes);
283-
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
284-
}
285-
286-
return new InboundFrame((FrameType)type, channel, payload);
287267
}
288-
finally
268+
catch (Exception)
289269
{
290-
ArrayPool<byte>.Shared.Return(headerBytes);
270+
// Early EOF.
271+
ArrayPool<byte>.Shared.Return(payloadBytes);
272+
throw new MalformedFrameException($"Short frame - expected to read {payloadSize} bytes, only got {bytesRead} bytes");
291273
}
274+
275+
int frameEndMarker = reader.ReadByte();
276+
if (frameEndMarker != Constants.FrameEnd)
277+
{
278+
ArrayPool<byte>.Shared.Return(payloadBytes);
279+
throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
280+
}
281+
282+
return new InboundFrame((FrameType)type, channel, payload);
292283
}
293284

294285
public void Dispose()

projects/RabbitMQ.Client/util/StreamExtensions.cs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Buffers;
23
using System.IO;
34
using System.Runtime.InteropServices;
45

@@ -15,5 +16,20 @@ internal static int Read(this Stream stream, Memory<byte> memory)
1516

1617
throw new InvalidOperationException("Unable to get array segment from memory.");
1718
}
19+
20+
internal static int Read(this Stream stream, Span<byte> buffer)
21+
{
22+
byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
23+
try
24+
{
25+
int numRead = stream.Read(sharedBuffer, 0, buffer.Length);
26+
new Span<byte>(sharedBuffer, 0, numRead).CopyTo(buffer);
27+
return numRead;
28+
}
29+
finally
30+
{
31+
ArrayPool<byte>.Shared.Return(sharedBuffer);
32+
}
33+
}
1834
}
1935
}

0 commit comments

Comments
 (0)