Skip to content

Backport #902 to 6.x #903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion projects/Apigen/apigen/Apigen.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ string GetParameterString(ParameterInfo pi)
throw new NotImplementedException();
}

EmitLine(" public override bool DispatchAsynchronous(Client.Impl.Command cmd) {");
EmitLine(" public override bool DispatchAsynchronous(in IncomingCommand cmd) {");
EmitLine(" switch ((cmd.Method.ProtocolClassId << 16) | cmd.Method.ProtocolMethodId)");
EmitLine(" {");
foreach (MethodInfo method in asynchronousHandlers)
Expand Down
22 changes: 6 additions & 16 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ public IConsumerDispatcher ConsumerDispatcher
{
get
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

return _delegate.ConsumerDispatcher;
}
Expand Down Expand Up @@ -477,14 +477,14 @@ public void Close(ShutdownEventArgs reason, bool abort)
}
}

public bool DispatchAsynchronous(Command cmd)
public bool DispatchAsynchronous(in IncomingCommand cmd)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

return _delegate.DispatchAsynchronous(cmd);
return _delegate.DispatchAsynchronous(in cmd);
}

public void FinishClose()
Expand All @@ -497,16 +497,6 @@ public void FinishClose()
_delegate.FinishClose();
}

public void HandleCommand(ISession session, Command cmd)
{
if (_disposed)
{
throw new ObjectDisposedException(GetType().FullName);
}

_delegate.HandleCommand(session, cmd);
}

public void OnBasicAck(BasicAckEventArgs args)
{
if (_disposed)
Expand Down
5 changes: 3 additions & 2 deletions projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,12 @@ namespace RabbitMQ.Client.Impl
{
class BasicPublishBatch : IBasicPublishBatch
{
private readonly List<Command> _commands = new List<Command>();
private readonly List<OutgoingCommand> _commands;
private readonly ModelBase _model;
internal BasicPublishBatch (ModelBase model)
{
_model = model;
_commands = new List<OutgoingCommand>();
}

public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
Expand All @@ -69,7 +70,7 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
_mandatory = mandatory
};

_commands.Add(new Command(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body, false));
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
}

public void Publish()
Expand Down
172 changes: 91 additions & 81 deletions projects/RabbitMQ.Client/client/impl/CommandAssembler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,117 +46,127 @@

namespace RabbitMQ.Client.Impl
{
enum AssemblyState
{
ExpectingMethod,
ExpectingContentHeader,
ExpectingContentBody,
Complete
}

class CommandAssembler
internal sealed class CommandAssembler
{
private const int MaxArrayOfBytesSize = 2_147_483_591;

public MethodBase m_method;
public ContentHeaderBase m_header;
public Memory<byte> m_body;
public ProtocolBase m_protocol;
public int m_remainingBodyBytes;
private readonly ProtocolBase _protocol;

private MethodBase _method;
private ContentHeaderBase _header;
private byte[] _bodyBytes;
private Memory<byte> _body;
private int _remainingBodyBytes;
private int _offset;
public AssemblyState m_state;
private AssemblyState _state;

public CommandAssembler(ProtocolBase protocol)
{
m_protocol = protocol;
_protocol = protocol;
Reset();
}

public Command HandleFrame(in InboundFrame f)
private void Reset()
{
_method = null;
_header = null;
_bodyBytes = null;
_body = Memory<byte>.Empty;
_remainingBodyBytes = 0;
_offset = 0;
_state = AssemblyState.ExpectingMethod;
}

public IncomingCommand HandleFrame(in InboundFrame frame)
{
switch (m_state)
switch (_state)
{
case AssemblyState.ExpectingMethod:
if (!f.IsMethod())
{
throw new UnexpectedFrameException(f.Type);
}
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
return CompletedCommand();
ParseMethodFrame(in frame);
break;
case AssemblyState.ExpectingContentHeader:
if (!f.IsHeader())
{
throw new UnexpectedFrameException(f.Type);
}

ReadOnlySpan<byte> span = f.Payload.Span;
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
m_header.ReadFrom(span.Slice(12));
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
throw new UnexpectedFrameException(f.Type);
}

m_remainingBodyBytes = (int)totalBodyBytes;

// Is returned by Command.Dispose in Session.HandleFrame
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
UpdateContentBodyState();
return CompletedCommand();
ParseHeaderFrame(in frame);
break;
case AssemblyState.ExpectingContentBody:
if (!f.IsBody())
{
throw new UnexpectedFrameException(f.Type);
}

if (f.Payload.Length > m_remainingBodyBytes)
{
throw new MalformedFrameException($"Overlong content body received - {m_remainingBodyBytes} bytes remaining, {f.Payload.Length} bytes received");
}

f.Payload.CopyTo(m_body.Slice(_offset));
m_remainingBodyBytes -= f.Payload.Length;
_offset += f.Payload.Length;
UpdateContentBodyState();
return CompletedCommand();
case AssemblyState.Complete:
default:
return null;
ParseBodyFrame(in frame);
break;
}

if (_state != AssemblyState.Complete)
{
return IncomingCommand.Empty;
}

var result = new IncomingCommand(_method, _header, _body, _bodyBytes);
Reset();
return result;
}

private void ParseMethodFrame(in InboundFrame frame)
{
if (frame.Type != FrameType.FrameMethod)
{
throw new UnexpectedFrameException(frame.Type);
}

_method = _protocol.DecodeMethodFrom(frame.Payload.Span);
_state = _method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
}

private Command CompletedCommand()
private void ParseHeaderFrame(in InboundFrame frame)
{
if (m_state == AssemblyState.Complete)
if (frame.Type != FrameType.FrameHeader)
{
Command result = new Command(m_method, m_header, m_body, true);
Reset();
return result;
throw new UnexpectedFrameException(frame.Type);
}
else

ReadOnlySpan<byte> span = frame.Payload.Span;
_header = _protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
_header.ReadFrom(span.Slice(12));
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
if (totalBodyBytes > MaxArrayOfBytesSize)
{
return null;
throw new UnexpectedFrameException(frame.Type);
}

_remainingBodyBytes = (int) totalBodyBytes;

// Is returned by IncomingCommand.Dispose in Session.HandleFrame
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
_body = new Memory<byte>(_bodyBytes, 0, _remainingBodyBytes);
UpdateContentBodyState();
}

private void Reset()
private void ParseBodyFrame(in InboundFrame frame)
{
m_state = AssemblyState.ExpectingMethod;
m_method = null;
m_header = null;
m_body = null;
_offset = 0;
m_remainingBodyBytes = 0;
if (frame.Type != FrameType.FrameBody)
{
throw new UnexpectedFrameException(frame.Type);
}

int payloadLength = frame.Payload.Length;
if (payloadLength > _remainingBodyBytes)
{
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyBytes} bytes remaining, {payloadLength} bytes received");
}

frame.Payload.CopyTo(_body.Slice(_offset));
_remainingBodyBytes -= payloadLength;
_offset += payloadLength;
UpdateContentBodyState();
}

private void UpdateContentBodyState()
{
m_state = (m_remainingBodyBytes > 0)
? AssemblyState.ExpectingContentBody
: AssemblyState.Complete;
_state = _remainingBodyBytes > 0 ? AssemblyState.ExpectingContentBody : AssemblyState.Complete;
}

private enum AssemblyState
{
ExpectingMethod,
ExpectingContentHeader,
ExpectingContentBody,
Complete
}
}
}
12 changes: 6 additions & 6 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,9 @@ public void ClosingLoop()
}
}

public Command ConnectionCloseWrapper(ushort reasonCode, string reasonText)
public OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateConnectionClose(reasonCode, reasonText, out Command request, out _, out _);
Protocol.CreateConnectionClose(reasonCode, reasonText, out OutgoingCommand request, out _, out _);
return request;
}

Expand Down Expand Up @@ -567,7 +567,7 @@ public void MainLoopIteration()
{
NotifyHeartbeatListener();
// We have received an actual frame.
if (frame.IsHeartbeat())
if (frame.Type == FrameType.FrameHeartbeat)
{
// Ignore it: we've already just reset the heartbeat
// latch.
Expand Down Expand Up @@ -769,7 +769,7 @@ public void QuiesceChannel(SoftProtocolException pe)
// the quiesce process.

ISession newSession = new QuiescingSession(this,
pe.Channel,
(ushort)pe.Channel,
pe.ShutdownReason);

// Here we detach the session from the connection. It's
Expand Down Expand Up @@ -1053,9 +1053,9 @@ private void Dispose(bool disposing)
// dispose unmanaged resources
}

Command ChannelCloseWrapper(ushort reasonCode, string reasonText)
internal OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText)
{
Protocol.CreateChannelClose(reasonCode, reasonText, out Command request, out _, out _);
Protocol.CreateChannelClose(reasonCode, reasonText, out OutgoingCommand request, out _, out _);
return request;
}

Expand Down
23 changes: 3 additions & 20 deletions projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,15 @@ public static Memory<byte> GetHeartbeatFrame()

internal readonly struct InboundFrame : IDisposable
{
public readonly ReadOnlyMemory<byte> Payload;
public readonly int Channel;
public readonly FrameType Type;
public readonly int Channel;
public readonly ReadOnlyMemory<byte> Payload;

private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload)
{
Payload = payload;
Type = type;
Channel = channel;
}

public bool IsMethod()
{
return Type == FrameType.FrameMethod;
}
public bool IsHeader()
{
return Type == FrameType.FrameHeader;
}
public bool IsBody()
{
return Type == FrameType.FrameBody;
}
public bool IsHeartbeat()
{
return Type == FrameType.FrameHeartbeat;
Payload = payload;
}

private static void ProcessProtocolHeader(Stream reader)
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/IRpcContinuation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
{
interface IRpcContinuation
{
void HandleCommand(Command cmd);
void HandleCommand(in IncomingCommand cmd);
void HandleModelShutdown(ShutdownEventArgs reason);
}
}
Loading