Skip to content

Commit f15fd77

Browse files
bollhalsmichaelklishin
authored andcommitted
fix issue 868
1 parent ba7b055 commit f15fd77

14 files changed

+344
-218
lines changed

projects/RabbitMQ.Client/client/impl/Command.cs

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@
4040

4141
using System;
4242
using System.Buffers;
43-
using System.Collections.Generic;
4443
using System.Runtime.InteropServices;
45-
using RabbitMQ.Client.Exceptions;
4644
using RabbitMQ.Client.Framing.Impl;
4745

4846
namespace RabbitMQ.Client.Impl
@@ -75,21 +73,55 @@ public Command(MethodBase method, ContentHeaderBase header, ReadOnlyMemory<byte>
7573

7674
internal MethodBase Method { get; private set; }
7775

78-
internal void Transmit(int channelNumber, Connection connection)
76+
internal void Transmit(ushort channelNumber, Connection connection)
7977
{
80-
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
78+
int maxBodyPayloadBytes = (int)(connection.FrameMax == 0 ? int.MaxValue : connection.FrameMax - EmptyFrameSize);
79+
var size = GetMaxSize(maxBodyPayloadBytes);
80+
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(size), 0, size);
81+
var span = memory.Span;
82+
83+
var offset = Framing.Method.WriteTo(span, channelNumber, Method);
8184
if (Method.HasContent)
8285
{
83-
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
84-
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
85-
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
86-
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
86+
int remainingBodyBytes = Body.Length;
87+
offset += Framing.Header.WriteTo(span.Slice(offset), channelNumber, Header, remainingBodyBytes);
88+
var bodySpan = Body.Span;
89+
while (remainingBodyBytes > 0)
8790
{
88-
int remaining = Body.Length - offset;
89-
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
90-
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
91+
int frameSize = remainingBodyBytes > maxBodyPayloadBytes ? maxBodyPayloadBytes : remainingBodyBytes;
92+
offset += Framing.BodySegment.WriteTo(span.Slice(offset), channelNumber, bodySpan.Slice(bodySpan.Length - remainingBodyBytes, frameSize));
93+
remainingBodyBytes -= frameSize;
9194
}
9295
}
96+
97+
if (offset != size)
98+
{
99+
throw new InvalidOperationException($"Serialized to wrong size, expect {size}, offset {offset}");
100+
}
101+
102+
connection.Write(memory);
103+
}
104+
105+
private int GetMaxSize(int maxPayloadBytes)
106+
{
107+
if (!Method.HasContent)
108+
{
109+
return Framing.Method.FrameSize + Method.GetRequiredBufferSize();
110+
}
111+
112+
return Framing.Method.FrameSize + Method.GetRequiredBufferSize() +
113+
Framing.Header.FrameSize + Header.GetRequiredPayloadBufferSize() +
114+
Framing.BodySegment.FrameSize * GetBodyFrameCount(maxPayloadBytes) + Body.Length;
115+
}
116+
117+
private int GetBodyFrameCount(int maxPayloadBytes)
118+
{
119+
if (maxPayloadBytes == int.MaxValue)
120+
{
121+
return 1;
122+
}
123+
124+
return (Body.Length + maxPayloadBytes - 1) / maxPayloadBytes;
93125
}
94126

95127
public void Dispose()

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ public Command HandleFrame(in InboundFrame f)
9292

9393
ReadOnlySpan<byte> span = f.Payload.Span;
9494
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
95-
ulong totalBodyBytes = m_header.ReadFrom(span.Slice(2));
95+
m_header.ReadFrom(span.Slice(12));
96+
ulong totalBodyBytes = NetworkOrderDeserializer.ReadUInt64(span.Slice(4));
9697
if (totalBodyBytes > MaxArrayOfBytesSize)
9798
{
9899
throw new UnexpectedFrameException(f.Type);

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
//---------------------------------------------------------------------------
4040

4141
using System;
42+
using System.Buffers;
4243
using System.Collections.Generic;
4344
using System.IO;
4445
using System.Net;
@@ -62,7 +63,6 @@ internal sealed class Connection : IConnection
6263
private readonly object _eventLock = new object();
6364

6465
///<summary>Heartbeat frame for transmission. Reusable across connections.</summary>
65-
private readonly EmptyOutboundFrame _heartbeatFrame = new EmptyOutboundFrame();
6666

6767
private readonly ManualResetEventSlim _appContinuation = new ManualResetEventSlim(false);
6868

@@ -903,7 +903,9 @@ public void HeartbeatWriteTimerCallback(object state)
903903
{
904904
if (!_closed)
905905
{
906-
WriteFrame(_heartbeatFrame);
906+
var memory = new Memory<byte>(ArrayPool<byte>.Shared.Rent(Client.Impl.Framing.Heartbeat.FrameSize), 0, Client.Impl.Framing.Heartbeat.FrameSize);
907+
Client.Impl.Framing.Heartbeat.Payload.CopyTo(memory.Span);
908+
Write(memory);
907909
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
908910
}
909911
}
@@ -940,9 +942,9 @@ public override string ToString()
940942
return string.Format("Connection({0},{1})", _id, Endpoint);
941943
}
942944

943-
public void WriteFrame(OutboundFrame f)
945+
public void Write(Memory<byte> memory)
944946
{
945-
_frameHandler.WriteFrame(f);
947+
_frameHandler.Write(memory);
946948
}
947949

948950
public void UpdateSecret(string newSecret, string reason)

projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@
4141
using System;
4242
using System.Text;
4343

44-
using RabbitMQ.Util;
45-
4644
namespace RabbitMQ.Client.Impl
4745
{
4846
abstract class ContentHeaderBase : IContentHeader
@@ -67,35 +65,15 @@ public virtual object Clone()
6765
///<summary>
6866
/// Fill this instance from the given byte buffer stream.
6967
///</summary>
70-
internal ulong ReadFrom(ReadOnlySpan<byte> span)
68+
internal void ReadFrom(ReadOnlySpan<byte> span)
7169
{
72-
// Skipping the first two bytes since they arent used (weight - not currently used)
73-
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(span.Slice(2));
74-
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span.Slice(10));
70+
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span);
7571
ReadPropertiesFrom(ref reader);
76-
return bodySize;
7772
}
7873

7974
internal abstract void ReadPropertiesFrom(ref ContentHeaderPropertyReader reader);
8075
internal abstract void WritePropertiesTo(ref ContentHeaderPropertyWriter writer);
8176

82-
private const ushort ZERO = 0;
83-
84-
internal int WriteTo(Span<byte> span, ulong bodySize)
85-
{
86-
NetworkOrderSerializer.WriteUInt16(span, ZERO); // Weight - not used
87-
NetworkOrderSerializer.WriteUInt64(span.Slice(2), bodySize);
88-
89-
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(span.Slice(10));
90-
WritePropertiesTo(ref writer);
91-
return 10 + writer.Offset;
92-
}
93-
public int GetRequiredBufferSize()
94-
{
95-
// The first 10 bytes are the Weight (2 bytes) + body size (8 bytes)
96-
return 10 + GetRequiredPayloadBufferSize();
97-
}
98-
9977
public abstract int GetRequiredPayloadBufferSize();
10078
}
10179
}

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 87 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -42,133 +42,117 @@
4242
using System.Buffers;
4343
using System.IO;
4444
using System.Net.Sockets;
45+
using System.Runtime.CompilerServices;
4546
using System.Runtime.ExceptionServices;
4647
using System.Runtime.InteropServices;
4748
using RabbitMQ.Client.Exceptions;
4849
using RabbitMQ.Util;
4950

5051
namespace RabbitMQ.Client.Impl
5152
{
52-
class HeaderOutboundFrame : OutboundFrame
53+
internal static class Framing
5354
{
54-
private readonly ContentHeaderBase _header;
55-
private readonly int _bodyLength;
56-
57-
internal HeaderOutboundFrame(int channel, ContentHeaderBase header, int bodyLength) : base(FrameType.FrameHeader, channel)
58-
{
59-
_header = header;
60-
_bodyLength = bodyLength;
61-
}
62-
63-
internal override int GetMinimumPayloadBufferSize()
64-
{
65-
// ProtocolClassId (2) + header (X bytes)
66-
return 2 + _header.GetRequiredBufferSize();
67-
}
68-
69-
internal override int WritePayload(Span<byte> span)
70-
{
71-
// write protocol class id (2 bytes)
72-
NetworkOrderSerializer.WriteUInt16(span, _header.ProtocolClassId);
73-
// write header (X bytes)
74-
int bytesWritten = _header.WriteTo(span.Slice(2), (ulong)_bodyLength);
75-
return bytesWritten + 2;
76-
}
77-
}
78-
79-
class BodySegmentOutboundFrame : OutboundFrame
80-
{
81-
private readonly ReadOnlyMemory<byte> _body;
82-
83-
internal BodySegmentOutboundFrame(int channel, ReadOnlyMemory<byte> bodySegment) : base(FrameType.FrameBody, channel)
84-
{
85-
_body = bodySegment;
86-
}
87-
88-
internal override int GetMinimumPayloadBufferSize()
89-
{
90-
return _body.Length;
91-
}
92-
93-
internal override int WritePayload(Span<byte> span)
94-
{
95-
_body.Span.CopyTo(span);
96-
return _body.Length;
97-
}
98-
}
99-
100-
class MethodOutboundFrame : OutboundFrame
101-
{
102-
private readonly MethodBase _method;
103-
104-
internal MethodOutboundFrame(int channel, MethodBase method) : base(FrameType.FrameMethod, channel)
105-
{
106-
_method = method;
107-
}
108-
109-
internal override int GetMinimumPayloadBufferSize()
55+
/* +------------+---------+----------------+---------+------------------+
56+
* | Frame Type | Channel | Payload length | Payload | Frame End Marker |
57+
* +------------+---------+----------------+---------+------------------+
58+
* | 1 byte | 2 bytes | 4 bytes | x bytes | 1 byte |
59+
* +------------+---------+----------------+---------+------------------+ */
60+
private const int BaseFrameSize = 1 + 2 + 4 + 1;
61+
private const int StartPayload = 7;
62+
63+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
64+
private static int WriteBaseFrame(Span<byte> span, FrameType type, ushort channel, int payloadLength)
11065
{
111-
// class id (2 bytes) + method id (2 bytes) + arguments (X bytes)
112-
return 4 + _method.GetRequiredBufferSize();
66+
const int StartFrameType = 0;
67+
const int StartChannel = 1;
68+
const int StartPayloadSize = 3;
69+
70+
span[StartFrameType] = (byte)type;
71+
NetworkOrderSerializer.WriteUInt16(span.Slice(StartChannel), channel);
72+
NetworkOrderSerializer.WriteUInt32(span.Slice(StartPayloadSize), (uint)payloadLength);
73+
span[StartPayload + payloadLength] = Constants.FrameEnd;
74+
return StartPayload + 1 + payloadLength;
11375
}
11476

115-
internal override int WritePayload(Span<byte> span)
116-
{
117-
NetworkOrderSerializer.WriteUInt16(span, _method.ProtocolClassId);
118-
NetworkOrderSerializer.WriteUInt16(span.Slice(2), _method.ProtocolMethodId);
119-
var argWriter = new MethodArgumentWriter(span.Slice(4));
120-
_method.WriteArgumentsTo(ref argWriter);
121-
return 4 + argWriter.Offset;
122-
}
123-
}
124-
125-
class EmptyOutboundFrame : OutboundFrame
126-
{
127-
public EmptyOutboundFrame() : base(FrameType.FrameHeartbeat, 0)
77+
internal static class Method
12878
{
79+
/* +----------+-----------+-----------+
80+
* | Class Id | Method Id | Arguments |
81+
* +----------+-----------+-----------+
82+
* | 2 bytes | 2 bytes | x bytes |
83+
* +----------+-----------+-----------+ */
84+
public const int FrameSize = BaseFrameSize + 2 + 2;
85+
86+
public static int WriteTo(Span<byte> span, ushort channel, MethodBase method)
87+
{
88+
const int StartClassId = StartPayload;
89+
const int StartMethodId = StartPayload + 2;
90+
const int StartMethodArguments = StartPayload + 4;
91+
92+
NetworkOrderSerializer.WriteUInt16(span.Slice(StartClassId), method.ProtocolClassId);
93+
NetworkOrderSerializer.WriteUInt16(span.Slice(StartMethodId), method.ProtocolMethodId);
94+
var argWriter = new MethodArgumentWriter(span.Slice(StartMethodArguments));
95+
method.WriteArgumentsTo(ref argWriter);
96+
return WriteBaseFrame(span, FrameType.FrameMethod, channel, StartMethodArguments - StartPayload + argWriter.Offset);
97+
}
12998
}
13099

131-
internal override int GetMinimumPayloadBufferSize()
100+
internal static class Header
132101
{
133-
return 0;
102+
/* +----------+----------+-------------------+-----------+
103+
* | Class Id | (unused) | Total body length | Arguments |
104+
* +----------+----------+-------------------+-----------+
105+
* | 2 bytes | 2 bytes | 8 bytes | x bytes |
106+
* +----------+----------+-------------------+-----------+ */
107+
public const int FrameSize = BaseFrameSize + 2 + 2 + 8;
108+
109+
public static int WriteTo(Span<byte> span, ushort channel, ContentHeaderBase header, int bodyLength)
110+
{
111+
const int StartClassId = StartPayload;
112+
const int StartWeight = StartPayload + 2;
113+
const int StartBodyLength = StartPayload + 4;
114+
const int StartHeaderArguments = StartPayload + 12;
115+
116+
NetworkOrderSerializer.WriteUInt16(span.Slice(StartClassId), header.ProtocolClassId);
117+
NetworkOrderSerializer.WriteUInt16(span.Slice(StartWeight), 0); // Weight - not used
118+
NetworkOrderSerializer.WriteUInt64(span.Slice(StartBodyLength), (ulong)bodyLength);
119+
var headerWriter = new ContentHeaderPropertyWriter(span.Slice(StartHeaderArguments));
120+
header.WritePropertiesTo(ref headerWriter);
121+
return WriteBaseFrame(span, FrameType.FrameHeader, channel, StartHeaderArguments - StartPayload + headerWriter.Offset);
122+
}
134123
}
135124

136-
internal override int WritePayload(Span<byte> span)
125+
internal static class BodySegment
137126
{
138-
return 0;
139-
}
140-
}
141-
142-
internal abstract class OutboundFrame
143-
{
144-
public int Channel { get; }
145-
public FrameType Type { get; }
127+
/* +--------------+
128+
* | Body segment |
129+
* +--------------+
130+
* | x bytes |
131+
* +--------------+ */
132+
public const int FrameSize = BaseFrameSize;
133+
134+
public static int WriteTo(Span<byte> span, ushort channel, ReadOnlySpan<byte> body)
135+
{
136+
const int StartBodyArgument = StartPayload;
146137

147-
protected OutboundFrame(FrameType type, int channel)
148-
{
149-
Type = type;
150-
Channel = channel;
138+
body.CopyTo(span.Slice(StartBodyArgument));
139+
return WriteBaseFrame(span, FrameType.FrameBody, channel, StartBodyArgument - StartPayload + body.Length);
140+
}
151141
}
152142

153-
internal void WriteTo(Span<byte> span)
143+
internal static class Heartbeat
154144
{
155-
span[0] = (byte)Type;
156-
NetworkOrderSerializer.WriteUInt16(span.Slice(1), (ushort)Channel);
157-
int bytesWritten = WritePayload(span.Slice(7));
158-
NetworkOrderSerializer.WriteUInt32(span.Slice(3), (uint)bytesWritten);
159-
span[bytesWritten + 7] = Constants.FrameEnd;
160-
}
145+
/* Empty frame */
146+
public const int FrameSize = BaseFrameSize;
161147

162-
internal abstract int WritePayload(Span<byte> span);
163-
internal abstract int GetMinimumPayloadBufferSize();
164-
internal int GetMinimumBufferSize()
165-
{
166-
return 8 + GetMinimumPayloadBufferSize();
167-
}
148+
public static Span<byte> Payload => new byte[]
149+
{
150+
Constants.FrameHeartbeat,
151+
0, 0, // channel
152+
0, 0, 0, 0, // payload length
153+
Constants.FrameEnd
154+
};
168155

169-
public override string ToString()
170-
{
171-
return $"(type={Type}, channel={Channel})";
172156
}
173157
}
174158

0 commit comments

Comments
 (0)