Skip to content

Commit ba7b055

Browse files
bollhalsmichaelklishin
authored andcommitted
spanification of WireFormatting & Reader/Writer
1 parent 195e2b2 commit ba7b055

18 files changed

+374
-362
lines changed

projects/Apigen/apigen/Apigen.cs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -855,9 +855,28 @@ public void EmitClassMethodImplementations(AmqpClass c)
855855
EmitLine("");
856856
EmitLine(" public override void WriteArgumentsTo(ref Client.Impl.MethodArgumentWriter writer)");
857857
EmitLine(" {");
858+
var lastWasBitClass = false;
858859
foreach (AmqpField f in m.m_Fields)
859860
{
860-
EmitLine($" writer.Write{MangleClass(ResolveDomain(f.Domain))}(_{MangleMethod(f.Name)});");
861+
string mangleClass = MangleClass(ResolveDomain(f.Domain));
862+
if (mangleClass != "Bit")
863+
{
864+
if (lastWasBitClass)
865+
{
866+
EmitLine($" writer.EndBits();");
867+
lastWasBitClass = false;
868+
}
869+
}
870+
else
871+
{
872+
lastWasBitClass = true;
873+
}
874+
875+
EmitLine($" writer.Write{mangleClass}(_{MangleMethod(f.Name)});");
876+
}
877+
if (lastWasBitClass)
878+
{
879+
EmitLine($" writer.EndBits();");
861880
}
862881
EmitLine(" }");
863882
EmitLine("");
@@ -933,14 +952,14 @@ public void EmitClassMethodImplementations(AmqpClass c)
933952

934953
public void EmitMethodArgumentReader()
935954
{
936-
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlyMemory<byte> memory)");
955+
EmitLine(" internal override Client.Impl.MethodBase DecodeMethodFrom(ReadOnlySpan<byte> span)");
937956
EmitLine(" {");
938-
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Span);");
939-
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(memory.Slice(2).Span);");
957+
EmitLine(" ushort classId = Util.NetworkOrderDeserializer.ReadUInt16(span);");
958+
EmitLine(" ushort methodId = Util.NetworkOrderDeserializer.ReadUInt16(span.Slice(2));");
940959
EmitLine(" Client.Impl.MethodBase result = DecodeMethodFrom(classId, methodId);");
941960
EmitLine(" if(result != null)");
942961
EmitLine(" {");
943-
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(memory.Slice(4));");
962+
EmitLine(" Client.Impl.MethodArgumentReader reader = new Client.Impl.MethodArgumentReader(span.Slice(4));");
944963
EmitLine(" result.ReadArgumentsFrom(ref reader);");
945964
EmitLine(" return result;");
946965
EmitLine(" }");

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<MinVerVerbosity>minimal</MinVerVerbosity>
2727
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
2828
<PackageOutputPath>..\..\packages</PackageOutputPath>
29+
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
2930
</PropertyGroup>
3031

3132
<PropertyGroup Condition="'$(CONCOURSE_CI_BUILD)' == 'true'">

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,16 +81,18 @@ public Command HandleFrame(in InboundFrame f)
8181
{
8282
throw new UnexpectedFrameException(f.Type);
8383
}
84-
m_method = m_protocol.DecodeMethodFrom(f.Payload);
84+
m_method = m_protocol.DecodeMethodFrom(f.Payload.Span);
8585
m_state = m_method.HasContent ? AssemblyState.ExpectingContentHeader : AssemblyState.Complete;
8686
return CompletedCommand();
8787
case AssemblyState.ExpectingContentHeader:
8888
if (!f.IsHeader())
8989
{
9090
throw new UnexpectedFrameException(f.Type);
9191
}
92-
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(f.Payload.Span));
93-
ulong totalBodyBytes = m_header.ReadFrom(f.Payload.Slice(2));
92+
93+
ReadOnlySpan<byte> span = f.Payload.Span;
94+
m_header = m_protocol.DecodeContentHeaderFrom(NetworkOrderDeserializer.ReadUInt16(span));
95+
ulong totalBodyBytes = m_header.ReadFrom(span.Slice(2));
9496
if (totalBodyBytes > MaxArrayOfBytesSize)
9597
{
9698
throw new UnexpectedFrameException(f.Type);

projects/RabbitMQ.Client/client/impl/ContentHeaderBase.cs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,11 @@ public virtual object Clone()
6767
///<summary>
6868
/// Fill this instance from the given byte buffer stream.
6969
///</summary>
70-
internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
70+
internal ulong ReadFrom(ReadOnlySpan<byte> span)
7171
{
7272
// Skipping the first two bytes since they arent used (weight - not currently used)
73-
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(memory.Slice(2).Span);
74-
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(memory.Slice(10));
73+
ulong bodySize = NetworkOrderDeserializer.ReadUInt64(span.Slice(2));
74+
ContentHeaderPropertyReader reader = new ContentHeaderPropertyReader(span.Slice(10));
7575
ReadPropertiesFrom(ref reader);
7676
return bodySize;
7777
}
@@ -81,13 +81,12 @@ internal ulong ReadFrom(ReadOnlyMemory<byte> memory)
8181

8282
private const ushort ZERO = 0;
8383

84-
internal int WriteTo(Memory<byte> memory, ulong bodySize)
84+
internal int WriteTo(Span<byte> span, ulong bodySize)
8585
{
86-
var span = memory.Span;
8786
NetworkOrderSerializer.WriteUInt16(span, ZERO); // Weight - not used
8887
NetworkOrderSerializer.WriteUInt64(span.Slice(2), bodySize);
8988

90-
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(memory.Slice(10));
89+
ContentHeaderPropertyWriter writer = new ContentHeaderPropertyWriter(span.Slice(10));
9190
WritePropertiesTo(ref writer);
9291
return 10 + writer.Offset;
9392
}

projects/RabbitMQ.Client/client/impl/ContentHeaderPropertyReader.cs

Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -45,26 +45,28 @@
4545

4646
namespace RabbitMQ.Client.Impl
4747
{
48-
internal struct ContentHeaderPropertyReader
48+
internal ref struct ContentHeaderPropertyReader
4949
{
50-
private ushort m_bitCount;
51-
private ushort m_flagWord;
52-
private int _memoryOffset;
53-
private readonly ReadOnlyMemory<byte> _memory;
50+
private const int StartBitMask = 0b1000_0000_0000_0000;
51+
private const int EndBitMask = 0b0000_0000_0000_0001;
5452

55-
public ContentHeaderPropertyReader(ReadOnlyMemory<byte> memory)
56-
{
57-
_memory = memory;
58-
_memoryOffset = 0;
59-
m_flagWord = 1; // just the continuation bit
60-
m_bitCount = 15; // the correct position to force a m_flagWord read
61-
}
53+
private readonly ReadOnlySpan<byte> _span;
54+
private int _offset;
55+
private int _bitMask;
56+
private int _bits;
6257

63-
public bool ContinuationBitSet
58+
private ReadOnlySpan<byte> Span => _span.Slice(_offset);
59+
60+
public ContentHeaderPropertyReader(ReadOnlySpan<byte> span)
6461
{
65-
get { return (m_flagWord & 1) != 0; }
62+
_span = span;
63+
_offset = 0;
64+
_bitMask = EndBitMask; // force a flag read
65+
_bits = 1; // just the continuation bit
6666
}
6767

68+
private bool ContinuationBitSet => (_bits & EndBitMask) != 0;
69+
6870
public void FinishPresence()
6971
{
7072
if (ContinuationBitSet)
@@ -78,82 +80,81 @@ public bool ReadBit()
7880
return ReadPresence();
7981
}
8082

81-
public void ReadFlagWord()
83+
private void ReadBits()
8284
{
8385
if (!ContinuationBitSet)
8486
{
8587
throw new MalformedFrameException("Attempted to read flag word when none advertised");
8688
}
87-
m_flagWord = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset).Span);
88-
_memoryOffset += 2;
89-
m_bitCount = 0;
89+
_bits = NetworkOrderDeserializer.ReadUInt16(Span);
90+
_offset += 2;
91+
_bitMask = StartBitMask;
9092
}
9193

9294
public uint ReadLong()
9395
{
94-
uint result = NetworkOrderDeserializer.ReadUInt32(_memory.Slice(_memoryOffset).Span);
95-
_memoryOffset += 4;
96+
uint result = NetworkOrderDeserializer.ReadUInt32(Span);
97+
_offset += 4;
9698
return result;
9799
}
98100

99101
public ulong ReadLonglong()
100102
{
101-
ulong result = NetworkOrderDeserializer.ReadUInt64(_memory.Slice(_memoryOffset).Span);
102-
_memoryOffset += 8;
103+
ulong result = NetworkOrderDeserializer.ReadUInt64(Span);
104+
_offset += 8;
103105
return result;
104106
}
105107

106108
public byte[] ReadLongstr()
107109
{
108-
byte[] result = WireFormatting.ReadLongstr(_memory.Slice(_memoryOffset));
109-
_memoryOffset += 4 + result.Length;
110+
byte[] result = WireFormatting.ReadLongstr(Span);
111+
_offset += 4 + result.Length;
110112
return result;
111113
}
112114

113115
public byte ReadOctet()
114116
{
115-
return _memory.Span[_memoryOffset++];
117+
return _span[_offset++];
116118
}
117119

118120
public bool ReadPresence()
119121
{
120-
if (m_bitCount == 15)
122+
if (_bitMask == EndBitMask)
121123
{
122-
ReadFlagWord();
124+
ReadBits();
123125
}
124126

125-
int bit = 15 - m_bitCount;
126-
bool result = (m_flagWord & (1 << bit)) != 0;
127-
m_bitCount++;
127+
bool result = (_bits & _bitMask) != 0;
128+
_bitMask >>= 1;
128129
return result;
129130
}
130131

131132
public ushort ReadShort()
132133
{
133-
ushort result = NetworkOrderDeserializer.ReadUInt16(_memory.Slice(_memoryOffset).Span);
134-
_memoryOffset += 2;
134+
ushort result = NetworkOrderDeserializer.ReadUInt16(Span);
135+
_offset += 2;
135136
return result;
136137
}
137138

138139
public string ReadShortstr()
139140
{
140-
string result = WireFormatting.ReadShortstr(_memory.Slice(_memoryOffset), out int bytesRead);
141-
_memoryOffset += bytesRead;
141+
string result = WireFormatting.ReadShortstr(Span, out int bytesRead);
142+
_offset += bytesRead;
142143
return result;
143144
}
144145

145146
/// <returns>A type of <seealso cref="System.Collections.Generic.IDictionary{TKey,TValue}"/>.</returns>
146147
public Dictionary<string, object> ReadTable()
147148
{
148-
Dictionary<string, object> result = WireFormatting.ReadTable(_memory.Slice(_memoryOffset), out int bytesRead);
149-
_memoryOffset += bytesRead;
149+
Dictionary<string, object> result = WireFormatting.ReadTable(Span, out int bytesRead);
150+
_offset += bytesRead;
150151
return result;
151152
}
152153

153154
public AmqpTimestamp ReadTimestamp()
154155
{
155-
AmqpTimestamp result = WireFormatting.ReadTimestamp(_memory.Slice(_memoryOffset));
156-
_memoryOffset += 8;
156+
AmqpTimestamp result = WireFormatting.ReadTimestamp(Span);
157+
_offset += 8;
157158
return result;
158159
}
159160
}

projects/RabbitMQ.Client/client/impl/ContentHeaderPropertyWriter.cs

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,24 +45,31 @@
4545

4646
namespace RabbitMQ.Client.Impl
4747
{
48-
struct ContentHeaderPropertyWriter
48+
internal ref struct ContentHeaderPropertyWriter
4949
{
50-
private int _bitCount;
51-
private ushort _flagWord;
52-
public int Offset { get; private set; }
53-
public Memory<byte> Memory { get; private set; }
50+
private const ushort StartBitMask = 0b1000_0000_0000_0000;
51+
private const ushort EndBitMask = 0b0000_0000_0000_0001;
5452

55-
public ContentHeaderPropertyWriter(Memory<byte> memory)
53+
private readonly Span<byte> _span;
54+
private int _offset;
55+
private ushort _bitAccumulator;
56+
private ushort _bitMask;
57+
58+
public int Offset => _offset;
59+
60+
private Span<byte> Span => _span.Slice(_offset);
61+
62+
public ContentHeaderPropertyWriter(Span<byte> span)
5663
{
57-
Memory = memory;
58-
_flagWord = 0;
59-
_bitCount = 0;
60-
Offset = 0;
64+
_span = span;
65+
_offset = 0;
66+
_bitAccumulator = 0;
67+
_bitMask = StartBitMask;
6168
}
6269

6370
public void FinishPresence()
6471
{
65-
EmitFlagWord(false);
72+
WriteBits();
6673
}
6774

6875
public void WriteBit(bool bit)
@@ -72,65 +79,67 @@ public void WriteBit(bool bit)
7279

7380
public void WriteLong(uint val)
7481
{
75-
Offset += WireFormatting.WriteLong(Memory.Slice(Offset), val);
82+
_offset += WireFormatting.WriteLong(Span, val);
7683
}
7784

7885
public void WriteLonglong(ulong val)
7986
{
80-
Offset += WireFormatting.WriteLonglong(Memory.Slice(Offset), val);
87+
_offset += WireFormatting.WriteLonglong(Span, val);
8188
}
8289

8390
public void WriteLongstr(byte[] val)
8491
{
85-
Offset += WireFormatting.WriteLongstr(Memory.Slice(Offset), val);
92+
_offset += WireFormatting.WriteLongstr(Span, val);
8693
}
8794

8895
public void WriteOctet(byte val)
8996
{
90-
Memory.Slice(Offset++).Span[0] = val;
97+
_span[_offset++] = val;
9198
}
9299

93100
public void WritePresence(bool present)
94101
{
95-
if (_bitCount == 15)
102+
if (_bitMask == EndBitMask)
96103
{
97-
EmitFlagWord(true);
104+
// Mark continuation
105+
_bitAccumulator |= _bitMask;
106+
WriteBits();
98107
}
99108

100109
if (present)
101110
{
102-
int bit = 15 - _bitCount;
103-
_flagWord = (ushort)(_flagWord | (1 << bit));
111+
_bitAccumulator |= _bitMask;
104112
}
105-
_bitCount++;
113+
114+
_bitMask >>= 1;
106115
}
107116

108117
public void WriteShort(ushort val)
109118
{
110-
Offset += WireFormatting.WriteShort(Memory.Slice(Offset), val);
119+
_offset += WireFormatting.WriteShort(Span, val);
111120
}
112121

113122
public void WriteShortstr(string val)
114123
{
115-
Offset += WireFormatting.WriteShortstr(Memory.Slice(Offset), val);
124+
_offset += WireFormatting.WriteShortstr(Span, val);
116125
}
117126

118127
public void WriteTable(IDictionary<string, object> val)
119128
{
120-
Offset += WireFormatting.WriteTable(Memory.Slice(Offset), val);
129+
_offset += WireFormatting.WriteTable(Span, val);
121130
}
122131

123132
public void WriteTimestamp(AmqpTimestamp val)
124133
{
125-
Offset += WireFormatting.WriteTimestamp(Memory.Slice(Offset), val);
134+
_offset += WireFormatting.WriteTimestamp(Span, val);
126135
}
127136

128-
private void EmitFlagWord(bool continuationBit)
137+
private void WriteBits()
129138
{
130-
NetworkOrderSerializer.WriteUInt16(Memory.Slice(Offset).Span, (ushort)(continuationBit ? (_flagWord | 1) : _flagWord));
131-
Offset += 2;
132-
_flagWord = 0;
133-
_bitCount = 0;
139+
NetworkOrderSerializer.WriteUInt16(Span, _bitAccumulator);
140+
_offset += 2;
141+
_bitMask = StartBitMask;
142+
_bitAccumulator = 0;
134143
}
135144
}
136145
}

0 commit comments

Comments
 (0)