Skip to content

Commit 49d60dc

Browse files
authored
Merge pull request #903 from rabbitmq/backport-pr-902
Backport #902 to 6.x
2 parents 787deac + a844d70 commit 49d60dc

19 files changed

+221
-223
lines changed

projects/Apigen/apigen/Apigen.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1425,7 +1425,7 @@ string GetParameterString(ParameterInfo pi)
14251425
throw new NotImplementedException();
14261426
}
14271427

1428-
EmitLine(" public override bool DispatchAsynchronous(Client.Impl.Command cmd) {");
1428+
EmitLine(" public override bool DispatchAsynchronous(in IncomingCommand cmd) {");
14291429
EmitLine(" switch ((cmd.Method.ProtocolClassId << 16) | cmd.Method.ProtocolMethodId)");
14301430
EmitLine(" {");
14311431
foreach (MethodInfo method in asynchronousHandlers)

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ public IConsumerDispatcher ConsumerDispatcher
6868
{
6969
get
7070
{
71-
if (_disposed)
72-
{
73-
throw new ObjectDisposedException(GetType().FullName);
74-
}
71+
if (_disposed)
72+
{
73+
throw new ObjectDisposedException(GetType().FullName);
74+
}
7575

7676
return _delegate.ConsumerDispatcher;
7777
}
@@ -477,14 +477,14 @@ public void Close(ShutdownEventArgs reason, bool abort)
477477
}
478478
}
479479

480-
public bool DispatchAsynchronous(Command cmd)
480+
public bool DispatchAsynchronous(in IncomingCommand cmd)
481481
{
482482
if (_disposed)
483483
{
484484
throw new ObjectDisposedException(GetType().FullName);
485485
}
486486

487-
return _delegate.DispatchAsynchronous(cmd);
487+
return _delegate.DispatchAsynchronous(in cmd);
488488
}
489489

490490
public void FinishClose()
@@ -497,16 +497,6 @@ public void FinishClose()
497497
_delegate.FinishClose();
498498
}
499499

500-
public void HandleCommand(ISession session, Command cmd)
501-
{
502-
if (_disposed)
503-
{
504-
throw new ObjectDisposedException(GetType().FullName);
505-
}
506-
507-
_delegate.HandleCommand(session, cmd);
508-
}
509-
510500
public void OnBasicAck(BasicAckEventArgs args)
511501
{
512502
if (_disposed)

projects/RabbitMQ.Client/client/impl/BasicPublishBatch.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@ namespace RabbitMQ.Client.Impl
4747
{
4848
class BasicPublishBatch : IBasicPublishBatch
4949
{
50-
private readonly List<Command> _commands = new List<Command>();
50+
private readonly List<OutgoingCommand> _commands;
5151
private readonly ModelBase _model;
5252
internal BasicPublishBatch (ModelBase model)
5353
{
5454
_model = model;
55+
_commands = new List<OutgoingCommand>();
5556
}
5657

5758
public void Add(string exchange, string routingKey, bool mandatory, IBasicProperties basicProperties, byte[] body)
@@ -69,7 +70,7 @@ public void Add(string exchange, string routingKey, bool mandatory, IBasicProper
6970
_mandatory = mandatory
7071
};
7172

72-
_commands.Add(new Command(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body, false));
73+
_commands.Add(new OutgoingCommand(method, (ContentHeaderBase)(basicProperties ?? _model._emptyBasicProperties), body));
7374
}
7475

7576
public void Publish()

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 91 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -46,117 +46,127 @@
4646

4747
namespace RabbitMQ.Client.Impl
4848
{
49-
enum AssemblyState
50-
{
51-
ExpectingMethod,
52-
ExpectingContentHeader,
53-
ExpectingContentBody,
54-
Complete
55-
}
56-
57-
class CommandAssembler
49+
internal sealed class CommandAssembler
5850
{
5951
private const int MaxArrayOfBytesSize = 2_147_483_591;
6052

61-
public MethodBase m_method;
62-
public ContentHeaderBase m_header;
63-
public Memory<byte> m_body;
64-
public ProtocolBase m_protocol;
65-
public int m_remainingBodyBytes;
53+
private readonly ProtocolBase _protocol;
54+
55+
private MethodBase _method;
56+
private ContentHeaderBase _header;
57+
private byte[] _bodyBytes;
58+
private Memory<byte> _body;
59+
private int _remainingBodyBytes;
6660
private int _offset;
67-
public AssemblyState m_state;
61+
private AssemblyState _state;
6862

6963
public CommandAssembler(ProtocolBase protocol)
7064
{
71-
m_protocol = protocol;
65+
_protocol = protocol;
7266
Reset();
7367
}
7468

75-
public Command HandleFrame(in InboundFrame f)
69+
private void Reset()
70+
{
71+
_method = null;
72+
_header = null;
73+
_bodyBytes = null;
74+
_body = Memory<byte>.Empty;
75+
_remainingBodyBytes = 0;
76+
_offset = 0;
77+
_state = AssemblyState.ExpectingMethod;
78+
}
79+
80+
public IncomingCommand HandleFrame(in InboundFrame frame)
7681
{
77-
switch (m_state)
82+
switch (_state)
7883
{
7984
case AssemblyState.ExpectingMethod:
80-
if (!f.IsMethod())
81-
{
82-
throw new UnexpectedFrameException(f.Type);
83-
}
84-
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
85-
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
86-
return CompletedCommand();
85+
ParseMethodFrame(in frame);
86+
break;
8787
case AssemblyState.ExpectingContentHeader:
88-
if (!f.IsHeader())
89-
{
90-
throw new UnexpectedFrameException(f.Type);
91-
}
92-
93-
ReadOnlySpan<byte> span = f.Payload.Span;
94-
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
95-
m_header.ReadFrom(span.Slice(12));
96-
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
97-
if (totalBodyBytes > MaxArrayOfBytesSize)
98-
{
99-
throw new UnexpectedFrameException(f.Type);
100-
}
101-
102-
m_remainingBodyBytes = (int)totalBodyBytes;
103-
104-
// Is returned by Command.Dispose in Session.HandleFrame
105-
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(m_remainingBodyBytes);
106-
m_body = new Memory<byte>(bodyBytes, 0, m_remainingBodyBytes);
107-
UpdateContentBodyState();
108-
return CompletedCommand();
88+
ParseHeaderFrame(in frame);
89+
break;
10990
case AssemblyState.ExpectingContentBody:
110-
if (!f.IsBody())
111-
{
112-
throw new UnexpectedFrameException(f.Type);
113-
}
114-
115-
if (f.Payload.Length > m_remainingBodyBytes)
116-
{
117-
throw new MalformedFrameException($"Overlong content body received - {m_remainingBodyBytes} bytes remaining, {f.Payload.Length} bytes received");
118-
}
119-
120-
f.Payload.CopyTo(m_body.Slice(_offset));
121-
m_remainingBodyBytes -= f.Payload.Length;
122-
_offset += f.Payload.Length;
123-
UpdateContentBodyState();
124-
return CompletedCommand();
125-
case AssemblyState.Complete:
126-
default:
127-
return null;
91+
ParseBodyFrame(in frame);
92+
break;
93+
}
94+
95+
if (_state != AssemblyState.Complete)
96+
{
97+
return IncomingCommand.Empty;
98+
}
99+
100+
var result = new IncomingCommand(_method, _header, _body, _bodyBytes);
101+
Reset();
102+
return result;
103+
}
104+
105+
private void ParseMethodFrame(in InboundFrame frame)
106+
{
107+
if (frame.Type != FrameType.FrameMethod)
108+
{
109+
throw new UnexpectedFrameException(frame.Type);
128110
}
111+
112+
_method = _protocol.DecodeMethodFrom(frame.Payload.Span);
113+
_state = _method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
129114
}
130115

131-
private Command CompletedCommand()
116+
private void ParseHeaderFrame(in InboundFrame frame)
132117
{
133-
if (m_state == AssemblyState.Complete)
118+
if (frame.Type != FrameType.FrameHeader)
134119
{
135-
Command result = new Command(m_method, m_header, m_body, true);
136-
Reset();
137-
return result;
120+
throw new UnexpectedFrameException(frame.Type);
138121
}
139-
else
122+
123+
ReadOnlySpan<byte> span = frame.Payload.Span;
124+
_header = _protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
125+
_header.ReadFrom(span.Slice(12));
126+
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
127+
if (totalBodyBytes > MaxArrayOfBytesSize)
140128
{
141-
return null;
129+
throw new UnexpectedFrameException(frame.Type);
142130
}
131+
132+
_remainingBodyBytes = (int) totalBodyBytes;
133+
134+
// Is returned by IncomingCommand.Dispose in Session.HandleFrame
135+
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
136+
_body = new Memory<byte>(_bodyBytes, 0, _remainingBodyBytes);
137+
UpdateContentBodyState();
143138
}
144139

145-
private void Reset()
140+
private void ParseBodyFrame(in InboundFrame frame)
146141
{
147-
m_state = AssemblyState.ExpectingMethod;
148-
m_method = null;
149-
m_header = null;
150-
m_body = null;
151-
_offset = 0;
152-
m_remainingBodyBytes = 0;
142+
if (frame.Type != FrameType.FrameBody)
143+
{
144+
throw new UnexpectedFrameException(frame.Type);
145+
}
146+
147+
int payloadLength = frame.Payload.Length;
148+
if (payloadLength > _remainingBodyBytes)
149+
{
150+
throw new MalformedFrameException($"Overlong content body received - {_remainingBodyBytes} bytes remaining, {payloadLength} bytes received");
151+
}
152+
153+
frame.Payload.CopyTo(_body.Slice(_offset));
154+
_remainingBodyBytes -= payloadLength;
155+
_offset += payloadLength;
156+
UpdateContentBodyState();
153157
}
154158

155159
private void UpdateContentBodyState()
156160
{
157-
m_state = (m_remainingBodyBytes > 0)
158-
? AssemblyState.ExpectingContentBody
159-
: AssemblyState.Complete;
161+
_state = _remainingBodyBytes > 0 ? AssemblyState.ExpectingContentBody : AssemblyState.Complete;
162+
}
163+
164+
private enum AssemblyState
165+
{
166+
ExpectingMethod,
167+
ExpectingContentHeader,
168+
ExpectingContentBody,
169+
Complete
160170
}
161171
}
162172
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -389,9 +389,9 @@ public void ClosingLoop()
389389
}
390390
}
391391

392-
public Command ConnectionCloseWrapper(ushort reasonCode, string reasonText)
392+
public OutgoingCommand ConnectionCloseWrapper(ushort reasonCode, string reasonText)
393393
{
394-
Protocol.CreateConnectionClose(reasonCode, reasonText, out Command request, out _, out _);
394+
Protocol.CreateConnectionClose(reasonCode, reasonText, out OutgoingCommand request, out _, out _);
395395
return request;
396396
}
397397

@@ -567,7 +567,7 @@ public void MainLoopIteration()
567567
{
568568
NotifyHeartbeatListener();
569569
// We have received an actual frame.
570-
if (frame.IsHeartbeat())
570+
if (frame.Type == FrameType.FrameHeartbeat)
571571
{
572572
// Ignore it: we've already just reset the heartbeat
573573
// latch.
@@ -769,7 +769,7 @@ public void QuiesceChannel(SoftProtocolException pe)
769769
// the quiesce process.
770770

771771
ISession newSession = new QuiescingSession(this,
772-
pe.Channel,
772+
(ushort)pe.Channel,
773773
pe.ShutdownReason);
774774

775775
// Here we detach the session from the connection. It's
@@ -1053,9 +1053,9 @@ private void Dispose(bool disposing)
10531053
// dispose unmanaged resources
10541054
}
10551055

1056-
Command ChannelCloseWrapper(ushort reasonCode, string reasonText)
1056+
internal OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonText)
10571057
{
1058-
Protocol.CreateChannelClose(reasonCode, reasonText, out Command request, out _, out _);
1058+
Protocol.CreateChannelClose(reasonCode, reasonText, out OutgoingCommand request, out _, out _);
10591059
return request;
10601060
}
10611061

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -168,32 +168,15 @@ public static Memory<byte> GetHeartbeatFrame()
168168

169169
internal readonly struct InboundFrame : IDisposable
170170
{
171-
public readonly ReadOnlyMemory<byte> Payload;
172-
public readonly int Channel;
173171
public readonly FrameType Type;
172+
public readonly int Channel;
173+
public readonly ReadOnlyMemory<byte> Payload;
174174

175175
private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload)
176176
{
177-
Payload = payload;
178177
Type = type;
179178
Channel = channel;
180-
}
181-
182-
public bool IsMethod()
183-
{
184-
return Type == FrameType.FrameMethod;
185-
}
186-
public bool IsHeader()
187-
{
188-
return Type == FrameType.FrameHeader;
189-
}
190-
public bool IsBody()
191-
{
192-
return Type == FrameType.FrameBody;
193-
}
194-
public bool IsHeartbeat()
195-
{
196-
return Type == FrameType.FrameHeartbeat;
179+
Payload = payload;
197180
}
198181

199182
private static void ProcessProtocolHeader(Stream reader)

projects/RabbitMQ.Client/client/impl/IRpcContinuation.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ namespace RabbitMQ.Client.Impl
4242
{
4343
interface IRpcContinuation
4444
{
45-
void HandleCommand(Command cmd);
45+
void HandleCommand(in IncomingCommand cmd);
4646
void HandleModelShutdown(ShutdownEventArgs reason);
4747
}
4848
}

0 commit comments

Comments
 (0)