Skip to content

cache the frameHeaderBuffer & rentedArray #911

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ public static Memory<byte> GetHeartbeatFrame()
public readonly FrameType Type;
public readonly int Channel;
public readonly ReadOnlyMemory<byte> Payload;
private readonly byte[] _rentedArray;

private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload)
private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload, byte[] rentedArray)
{
Type = type;
Channel = channel;
Payload = payload;
_rentedArray = rentedArray;
}

private static void ProcessProtocolHeader(Stream reader)
Expand Down Expand Up @@ -210,17 +212,12 @@ private static void ProcessProtocolHeader(Stream reader)
}
}

internal static InboundFrame ReadFrom(Stream reader)
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer)
{
int type = default;

try
{
type = reader.ReadByte();
if (type == -1)
{
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
}
}
catch (IOException ioe)
{
Expand All @@ -237,51 +234,51 @@ internal static InboundFrame ReadFrom(Stream reader)
ExceptionDispatchInfo.Capture(ioe.InnerException).Throw();
}

if (type == 'A')
switch (type)
{
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader);
case -1:
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
case 'A':
// Probably an AMQP protocol header, otherwise meaningless
ProcessProtocolHeader(reader);
break;
}

Span<byte> headerBytes = stackalloc byte[6];
reader.Read(headerBytes);
int channel = NetworkOrderDeserializer.ReadUInt16(headerBytes);
int payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2)); // FIXME - throw exn on unreasonable value
reader.Read(frameHeaderBuffer, 0, frameHeaderBuffer.Length);
int channel = NetworkOrderDeserializer.ReadUInt16(new ReadOnlySpan<byte>(frameHeaderBuffer));
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4)); // FIXME - throw exn on unreasonable value

const int EndMarkerLength = 1;
// Is returned by InboundFrame.Dispose in Connection.MainLoopIteration
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(payloadSize);
Memory<byte> payload = new Memory<byte>(payloadBytes, 0, payloadSize);
var readSize = payloadSize + EndMarkerLength;
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
int bytesRead = 0;
try
{
while (bytesRead < payloadSize)
while (bytesRead < readSize)
{
bytesRead += reader.Read(payload.Slice(bytesRead, payloadSize - bytesRead));
bytesRead += reader.Read(payloadBytes, bytesRead, readSize - bytesRead);
}
}
catch (Exception)
{
// Early EOF.
ArrayPool<byte>.Shared.Return(payloadBytes);
throw new MalformedFrameException($"Short frame - expected to read {payloadSize} bytes, only got {bytesRead} bytes");
throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes, only got {bytesRead} bytes");
}

int frameEndMarker = reader.ReadByte();
if (frameEndMarker != Constants.FrameEnd)
if (payloadBytes[payloadSize] != Constants.FrameEnd)
{
ArrayPool<byte>.Shared.Return(payloadBytes);
throw new MalformedFrameException($"Bad frame end marker: {frameEndMarker}");
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
}

return new InboundFrame((FrameType)type, channel, payload);
return new InboundFrame((FrameType)type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
}

public void Dispose()
{
if (MemoryMarshal.TryGetArray(Payload, out ArraySegment<byte> segment))
{
ArrayPool<byte>.Shared.Return(segment.Array);
}
ArrayPool<byte>.Shared.Return(_rentedArray);
}

public override string ToString()
Expand Down
4 changes: 3 additions & 1 deletion projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ class SocketFrameHandler : IFrameHandler
private readonly ChannelReader<Memory<byte>> _channelReader;
private readonly Task _writerTask;
private readonly object _semaphore = new object();
private readonly byte[] _frameHeaderBuffer;
private bool _closed;

public SocketFrameHandler(AmqpTcpEndpoint endpoint,
Func<AddressFamily, ITcpClient> socketFactory,
TimeSpan connectionTimeout, TimeSpan readTimeout, TimeSpan writeTimeout)
{
Endpoint = endpoint;
_frameHeaderBuffer = new byte[6];
var channel = Channel.CreateUnbounded<Memory<byte>>(
new UnboundedChannelOptions
{
Expand Down Expand Up @@ -241,7 +243,7 @@ public void Close()

public InboundFrame ReadFrame()
{
return RabbitMQ.Client.Impl.InboundFrame.ReadFrom(_reader);
return InboundFrame.ReadFrom(_reader, _frameHeaderBuffer);
}

public void SendHeader()
Expand Down
35 changes: 0 additions & 35 deletions projects/RabbitMQ.Client/util/StreamExtensions.cs

This file was deleted.