Skip to content

fix issue 868 #878

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions projects/Apigen/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -855,9 +855,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
EmitLine("");
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
EmitLine(" {");
var lastWasBitClass = false;
foreach (AmqpField f in m.m_Fields)
{
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
string mangleClass = MangleClass(ResolveDomain(f.Domain));
if (mangleClass != "Bit")
{
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
lastWasBitClass = false;
}
}
else
{
lastWasBitClass = true;
}

EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
}
if (lastWasBitClass)
{
EmitLine($" writer.EndBits();");
}
EmitLine(" }");
EmitLine("");
Expand Down Expand Up @@ -933,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)

public void EmitMethodArgumentReader()
{
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
EmitLine(" {");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
EmitLine(" if(result != null)");
EmitLine(" {");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
EmitLine(" result.ReadArgumentsFrom(ref reader);");
EmitLine(" return result;");
EmitLine(" }");
Expand Down
1 change: 1 addition & 0 deletions projects/RabbitMQ.Client/RabbitMQ.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
<MinVerVerbosity>minimal</MinVerVerbosity>
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
<PackageOutputPath>..\..\packages</PackageOutputPath>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">
Expand Down
72 changes: 42 additions & 30 deletions projects/RabbitMQ.Client/client/impl/Command.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@

using System;
using System.Buffers;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using RabbitMQ.Client.Exceptions;
using RabbitMQ.Client.Framing.Impl;

namespace RabbitMQ.Client.Impl
Expand All @@ -57,11 +55,6 @@ class Command : IDisposable
private const int EmptyFrameSize = 8;
private readonly bool _returnBufferOnDispose;

static Command()
{
CheckEmptyFrameSize();
}

internal Command(MethodBase method) : this(method, null, null, false)
{
}
Expand All @@ -80,38 +73,57 @@ public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte>

internal MethodBase Method { get; private set; }

public static void CheckEmptyFrameSize()
internal void Transmit(ushort channelNumber, Connection connection)
{
var f = new EmptyOutboundFrame();
byte[] b = new byte[f.GetMinimumBufferSize()];
f.WriteTo(b);
long actualLength = f.ByteCount;
int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize);
var size = GetMaxSize(maxBodyPayloadBytes);

if (EmptyFrameSize != actualLength)
// Will be returned by SocketFrameWriter.WriteLoop
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(size), 0, size);
var span = memory.Span;

var offset = Framing.Method.WriteTo(span, channelNumber, Method);
if (Method.HasContent)
{
string message =
string.Format("EmptyFrameSize is incorrect - defined as {0} where the computed value is in fact {1}.",
EmptyFrameSize,
actualLength);
throw new ProtocolViolationException(message);
int remainingBodyBytes = Body.Length;
offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, Header, remainingBodyBytes);
var bodySpan = Body.Span;
while (remainingBodyBytes > 0)
{
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
offset += Framing.BodySegment.WriteTo(span.Slice(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
remainingBodyBytes -= frameSize;
}
}

if (offset != size)
{
throw new InvalidOperationException($"Serialized to wrong size, expect {size}, offset {offset}");
}

connection.Write(memory);
}

internal void Transmit(int channelNumber, Connection connection)
private int GetMaxSize(int maxPayloadBytes)
{
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
if (Method.HasContent)
if (!Method.HasContent)
{
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
{
int remaining = Body.Length - offset;
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
}
return Framing.Method.FrameSize + Method.GetRequiredBufferSize();
}

return Framing.Method.FrameSize + Method.GetRequiredBufferSize() +
Framing.Header.FrameSize + Header.GetRequiredPayloadBufferSize() +
Framing.BodySegment.FrameSize * GetBodyFrameCount(maxPayloadBytes) + Body.Length;
}

private int GetBodyFrameCount(int maxPayloadBytes)
{
if (maxPayloadBytes == int.MaxValue)
{
return 1;
}

return (Body.Length + maxPayloadBytes - 1) / maxPayloadBytes;
}

public void Dispose()
Expand Down
11 changes: 8 additions & 3 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,22 +81,27 @@ public Command HandleFrame(in InboundFrame f)
{
throw new UnexpectedFrameException(f.Type);
}
m_method = m_protocol.DecodeMethodFrom(f.Payload);
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
return CompletedCommand();
case AssemblyState.ExpectingContentHeader:
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f.Type);
}
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));

ReadOnlySpan<byte> span = f.Payload.Span;
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
m_header.ReadFrom(span.Slice(12));
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(f.Type);
}

m_remainingBodyBytes = (int)totalBodyBytes;

// Is returned by Command.Dispose in Session.HandleFrame
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
UpdateContentBodyState();
Expand Down
8 changes: 4 additions & 4 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
//---------------------------------------------------------------------------

using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Net;
Expand All @@ -62,7 +63,6 @@ internal sealed class Connection : IConnection
private readonly object _eventLock = new object();

///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();

private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);

Expand Down Expand Up @@ -902,7 +902,7 @@ public void HeartbeatWriteTimerCallback(object state)
{
if (!_closed)
{
WriteFrame(_heartbeatFrame);
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
}
}
Expand Down Expand Up @@ -939,9 +939,9 @@ public override string ToString()
return string.Format("Connection({0},{1})", _id, Endpoint);
}

public void WriteFrame(OutboundFrame f)
public void Write(Memory<byte> memory)
{
_frameHandler.WriteFrame(f);
_frameHandler.Write(memory);
}

public void UpdateSecret(string newSecret, string reason)
Expand Down
26 changes: 2 additions & 24 deletions projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
using System;
using System.Text;

using RabbitMQ.Util;

namespace RabbitMQ.Client.Impl
{
abstract class ContentHeaderBase : IContentHeader
Expand All @@ -67,35 +65,15 @@ public virtual object Clone()
///<summary>
/// Fill this instance from the given byte buffer stream.
///</summary>
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
internal void ReadFrom(ReadOnlySpan<byte> span)
{
// Skipping the first two bytes since they arent used (weight - not currently used)
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span);
ReadPropertiesFrom(ref reader);
return bodySize;
}

internal abstract void ReadPropertiesFrom(ref ContentHeaderPropertyReader reader);
internal abstract void WritePropertiesTo(ref ContentHeaderPropertyWriter writer);

private const ushort ZERO = 0;

internal int WriteTo(Memory<byte> memory, ulong bodySize)
{
NetworkOrderSerializer.WriteUInt16(memory.Span, ZERO); // Weight - not used
NetworkOrderSerializer.WriteUInt64(memory.Slice(2).Span, bodySize);

ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
WritePropertiesTo(ref writer);
return 10 + writer.Offset;
}
public int GetRequiredBufferSize()
{
// The first 10 bytes are the Weight (2 bytes) + body size (8 bytes)
return 10 + GetRequiredPayloadBufferSize();
}

public abstract int GetRequiredPayloadBufferSize();
}
}
Loading