@@ -33,6 +33,7 @@ public ValueTask<ArraySegment<byte>> ReadPayloadAsync(ProtocolErrorBehavior prot
33
33
34
34
public ValueTask < int > WritePayloadAsync ( ArraySegment < byte > payload , IOBehavior ioBehavior )
35
35
{
36
+ // break the payload up into (possibly more than one) uncompressed packets
36
37
return ProtocolUtility . WritePayloadAsync ( m_uncompressedStreamByteHandler , GetNextUncompressedSequenceNumber , payload , ioBehavior ) . ContinueWith ( _ =>
37
38
{
38
39
if ( m_uncompressedStream . Length == 0 )
@@ -45,6 +46,7 @@ public ValueTask<int> WritePayloadAsync(ArraySegment<byte> payload, IOBehavior i
45
46
return CompressAndWrite ( uncompressedData , ioBehavior )
46
47
. ContinueWith ( __ =>
47
48
{
49
+ // reset the uncompressed stream to accept more data
48
50
m_uncompressedStream . SetLength ( 0 ) ;
49
51
return default ( ValueTask < int > ) ;
50
52
} ) ;
@@ -53,6 +55,7 @@ public ValueTask<int> WritePayloadAsync(ArraySegment<byte> payload, IOBehavior i
53
55
54
56
private ValueTask < ArraySegment < byte > > ReadBytesAsync ( int count , ProtocolErrorBehavior protocolErrorBehavior , IOBehavior ioBehavior )
55
57
{
58
+ // satisfy the read from cache if possible
56
59
if ( m_remainingData . Count > 0 )
57
60
{
58
61
int bytesToRead = Math . Min ( m_remainingData . Count , count ) ;
@@ -61,6 +64,7 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
61
64
return new ValueTask < ArraySegment < byte > > ( result ) ;
62
65
}
63
66
67
+ // read the compressed header (seven bytes)
64
68
return m_bufferedByteReader . ReadBytesAsync ( m_byteHandler , 7 , ioBehavior )
65
69
. ContinueWith ( headerReadBytes =>
66
70
{
@@ -75,6 +79,7 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
75
79
int packetSequenceNumber = headerReadBytes . Array [ headerReadBytes . Offset + 3 ] ;
76
80
var uncompressedLength = ( int ) SerializationUtility . ReadUInt32 ( headerReadBytes . Array , headerReadBytes . Offset + 4 , 3 ) ;
77
81
82
+ // verify the compressed packet sequence number
78
83
var expectedSequenceNumber = GetNextCompressedSequenceNumber ( ) % 256 ;
79
84
if ( packetSequenceNumber != expectedSequenceNumber )
80
85
{
@@ -85,11 +90,14 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
85
90
return ValueTaskExtensions . FromException < ArraySegment < byte > > ( exception ) ;
86
91
}
87
92
88
- // MySQL protocol hack: reset the uncompressed sequence number back to the sequence number of this compressed packet
93
+ // MySQL protocol resets the uncompressed sequence number back to the sequence number of this compressed packet.
94
+ // This isn't in the documentation, but the code explicitly notes that uncompressed packets are modified by compression:
95
+ // - https://github.com/mysql/mysql-server/blob/c28e258157f39f25e044bb72e8bae1ff00989a3d/sql/net_serv.cc#L276
96
+ // - https://github.com/mysql/mysql-server/blob/c28e258157f39f25e044bb72e8bae1ff00989a3d/sql/net_serv.cc#L225-L227
89
97
if ( ! m_isContinuationPacket )
90
98
m_uncompressedSequenceNumber = packetSequenceNumber ;
91
99
92
- // except when uncompressed packets need to be broken up across multiple compressed packets
100
+ // except this doesn't happen when uncompressed packets need to be broken up across multiple compressed packets
93
101
m_isContinuationPacket = payloadLength == ProtocolUtility . MaxPacketSize || uncompressedLength == ProtocolUtility . MaxPacketSize ;
94
102
95
103
return m_bufferedByteReader . ReadBytesAsync ( m_byteHandler , payloadLength , ioBehavior )
@@ -104,7 +112,7 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
104
112
105
113
if ( uncompressedLength == 0 )
106
114
{
107
- // uncompressed
115
+ // data is uncompressed
108
116
m_remainingData = payloadReadBytes ;
109
117
}
110
118
else
@@ -122,8 +130,12 @@ private ValueTask<ArraySegment<byte>> ReadBytesAsync(int count, ProtocolErrorBeh
122
130
ValueTaskExtensions . FromException < ArraySegment < byte > > ( new NotSupportedException ( "Unsupported zlib header: {0:X2}{1:X2}" . FormatInvariant ( cmf , flg ) ) ) ;
123
131
}
124
132
133
+ // zlib format (https://www.ietf.org/rfc/rfc1950.txt) is: [two header bytes] [deflate-compressed data] [four-byte checksum]
134
+ // .NET implements the middle part with DeflateStream; need to handle header and checksum explicitly
135
+ const int headerSize = 2 ;
136
+ const int checksumSize = 4 ;
125
137
var uncompressedData = new byte [ uncompressedLength ] ;
126
- using ( var compressedStream = new MemoryStream ( payloadReadBytes . Array , payloadReadBytes . Offset + 2 , payloadReadBytes . Count - 6 ) ) // TODO: handle zlib format correctly
138
+ using ( var compressedStream = new MemoryStream ( payloadReadBytes . Array , payloadReadBytes . Offset + headerSize , payloadReadBytes . Count - headerSize - checksumSize ) )
127
139
using ( var decompressingStream = new DeflateStream ( compressedStream , CompressionMode . Decompress ) )
128
140
{
129
141
var bytesRead = decompressingStream . Read ( uncompressedData , 0 , uncompressedLength ) ;
@@ -219,6 +231,7 @@ private ValueTask<int> CompressAndWrite(ArraySegment<byte> remainingUncompressed
219
231
CompressAndWrite ( remainingUncompressedData , ioBehavior ) ) ;
220
232
}
221
233
234
+ // CompressedByteHandler implements IByteHandler and delegates reading bytes back to the CompressedPayloadHandler class.
222
235
private class CompressedByteHandler : IByteHandler
223
236
{
224
237
public CompressedByteHandler ( CompressedPayloadHandler compressedPayloadHandler , ProtocolErrorBehavior protocolErrorBehavior )
0 commit comments