Skip to content

Commit 49c61dc

Browse files
stebetlukebakken
authored andcommitted
Minimal System.IO.Pipelines integration to prepare for full-async work.
1 parent cbe6a8c commit 49c61dc

File tree

9 files changed

+135
-126
lines changed

9 files changed

+135
-126
lines changed

projects/RabbitMQ.Client/RabbitMQ.Client.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
<PackageReference Include="MinVer" Version="4.3.0" PrivateAssets="all" />
6464
<PackageReference Include="System.Memory" Version="4.5.5" />
6565
<PackageReference Include="System.Threading.Channels" Version="7.0.0" />
66+
<PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
6667
</ItemGroup>
6768

6869
</Project>

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@ internal sealed partial class Connection
4343
private readonly IFrameHandler _frameHandler;
4444
private readonly Task _mainLoopTask;
4545

46-
private void MainLoop()
46+
private async Task MainLoop()
4747
{
4848
try
4949
{
50-
ReceiveLoop();
50+
await ReceiveLoop();
5151
}
5252
catch (EndOfStreamException eose)
5353
{
@@ -56,7 +56,7 @@ private void MainLoop()
5656
}
5757
catch (HardProtocolException hpe)
5858
{
59-
HardProtocolExceptionHandler(hpe);
59+
await HardProtocolExceptionHandler(hpe);
6060
}
6161
catch (Exception ex)
6262
{
@@ -66,11 +66,11 @@ private void MainLoop()
6666
FinishClose();
6767
}
6868

69-
private void ReceiveLoop()
69+
private async Task ReceiveLoop()
7070
{
7171
while (!_closed)
7272
{
73-
InboundFrame frame = _frameHandler.ReadFrame();
73+
InboundFrame frame = await _frameHandler.ReadFrame();
7474
NotifyHeartbeatListener();
7575

7676
bool shallReturn = true;
@@ -139,7 +139,7 @@ private void HandleMainLoopException(ShutdownEventArgs reason)
139139
LogCloseError($"Unexpected connection closure: {reason}", new Exception(reason.ToString()));
140140
}
141141

142-
private void HardProtocolExceptionHandler(HardProtocolException hpe)
142+
private async Task HardProtocolExceptionHandler(HardProtocolException hpe)
143143
{
144144
if (SetCloseReason(hpe.ShutdownReason))
145145
{
@@ -151,7 +151,7 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
151151
_session0.Transmit(in cmd);
152152
if (hpe.CanShutdownCleanly)
153153
{
154-
ClosingLoop();
154+
await ClosingLoop();
155155
}
156156
}
157157
catch (IOException ioe)
@@ -168,13 +168,13 @@ private void HardProtocolExceptionHandler(HardProtocolException hpe)
168168
///<remarks>
169169
/// Loop only used while quiescing. Use only to cleanly close connection
170170
///</remarks>
171-
private void ClosingLoop()
171+
private async Task ClosingLoop()
172172
{
173173
try
174174
{
175175
_frameHandler.ReadTimeout = TimeSpan.Zero;
176176
// Wait for response/socket closure or timeout
177-
ReceiveLoop();
177+
await ReceiveLoop();
178178
}
179179
catch (ObjectDisposedException ode)
180180
{

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Connection(ConnectionConfig config, IFrameHandler frameHandler)
7979
["connection_name"] = ClientProvidedName
8080
};
8181

82-
_mainLoopTask = Task.Factory.StartNew(MainLoop, TaskCreationOptions.LongRunning);
82+
_mainLoopTask = Task.Run(MainLoop);
8383
try
8484
{
8585
Open();

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 79 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@
3131

3232
using System;
3333
using System.Buffers;
34+
using System.Diagnostics;
3435
using System.IO;
36+
using System.IO.Pipelines;
3537
using System.Net.Sockets;
3638
using System.Runtime.CompilerServices;
3739
using System.Runtime.ExceptionServices;
40+
using System.Threading.Tasks;
3841

3942
using RabbitMQ.Client.Exceptions;
4043
using RabbitMQ.Client.Framing.Impl;
@@ -130,14 +133,14 @@ internal static class Heartbeat
130133

131134
/// <summary>
132135
/// Compiler trick to directly refer to static data in the assembly, see here: https://github.com/dotnet/roslyn/pull/24621
133-
/// </summary>
134-
private static ReadOnlySpan<byte> Payload => new byte[]
135-
{
136-
Constants.FrameHeartbeat,
137-
0, 0, // channel
138-
0, 0, 0, 0, // payload length
139-
Constants.FrameEnd
140-
};
136+
/// A heartbeat frame has the following layout:
137+
/// +--------------------+------------------+-----------------+--------------------------+
138+
/// | Frame Type(1 byte) | Channel(2 bytes) | Length(4 bytes) | End Frame Marker(1 byte) |
139+
/// +--------------------+------------------+-----------------+--------------------------+
140+
/// | 0x08 | 0x0000 | 0x00000000 | 0xCE |
141+
/// +--------------------+------------------+-----------------+--------------------------+
142+
///</summary>
143+
private static ReadOnlySpan<byte> Payload => new byte[] { Constants.FrameHeartbeat, 0, 0, 0, 0, 0, 0, Constants.FrameEnd };
141144

142145
public static Memory<byte> GetHeartbeatFrame()
143146
{
@@ -201,7 +204,7 @@ private static int GetBodyFrameCount(int maxPayloadBytes, int length)
201204
}
202205
}
203206

204-
internal readonly ref struct InboundFrame
207+
internal readonly struct InboundFrame
205208
{
206209
public readonly FrameType Type;
207210
public readonly int Channel;
@@ -216,22 +219,24 @@ private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload,
216219
_rentedArray = rentedArray;
217220
}
218221

219-
private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> frameHeader)
222+
private static void ProcessProtocolHeader(ReadOnlySequence<byte> buffer)
220223
{
221224
try
222225
{
223-
if (frameHeader[0] != 'M' || frameHeader[1] != 'Q' || frameHeader[2] != 'P')
226+
if (buffer.Length < 8)
224227
{
225-
throw new MalformedFrameException("Invalid AMQP protocol header from server");
228+
throw new EndOfStreamException();
226229
}
227230

228-
int serverMinor = reader.ReadByte();
229-
if (serverMinor == -1)
231+
Span<byte> tempSpan = stackalloc byte[8];
232+
buffer.Slice(0, 8).CopyTo(tempSpan);
233+
234+
if (tempSpan[1] != 'M' || tempSpan[2] != 'Q' || tempSpan[3] != 'P')
230235
{
231-
throw new EndOfStreamException();
236+
throw new MalformedFrameException("Invalid AMQP protocol header from server");
232237
}
233238

234-
throw new PacketNotRecognizedException(frameHeader[3], frameHeader[4], frameHeader[5], serverMinor);
239+
throw new PacketNotRecognizedException(tempSpan[4], tempSpan[5], tempSpan[6], tempSpan[7]);
235240
}
236241
catch (EndOfStreamException)
237242
{
@@ -246,38 +251,59 @@ private static void ProcessProtocolHeader(Stream reader, ReadOnlySpan<byte> fram
246251
}
247252
}
248253

249-
internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, uint maxMessageSize)
254+
internal static async ValueTask<InboundFrame> ReadFromPipe(PipeReader reader, uint maxMessageSize)
250255
{
251-
try
256+
// Try a synchronous read first, then go async
257+
if (!reader.TryRead(out ReadResult result))
252258
{
253-
ReadFromStream(reader, frameHeaderBuffer, frameHeaderBuffer.Length);
259+
result = await reader.ReadAsync().ConfigureAwait(false);
254260
}
255-
catch (IOException ioe)
261+
262+
ReadOnlySequence<byte> buffer = result.Buffer;
263+
264+
if (result.IsCompleted || buffer.Length == 0)
256265
{
257-
// If it's a WSAETIMEDOUT SocketException, unwrap it.
258-
// This might happen when the limit of half-open connections is
259-
// reached.
260-
if (ioe?.InnerException is SocketException exception && exception.SocketErrorCode == SocketError.TimedOut)
261-
{
262-
ExceptionDispatchInfo.Capture(exception).Throw();
263-
}
264-
else
266+
throw new EndOfStreamException("Pipe is completed.");
267+
}
268+
269+
byte firstByte = buffer.First.Span[0];
270+
if (firstByte == 'A')
271+
{
272+
ProcessProtocolHeader(buffer);
273+
}
274+
275+
InboundFrame frame;
276+
277+
while (!TryReadFrame(ref buffer, maxMessageSize, out frame))
278+
{
279+
reader.AdvanceTo(buffer.Start, buffer.End);
280+
281+
// No need to try a synchronous read since we have an incomplete frame anyway, so we'll always need to go async
282+
result = await reader.ReadAsync().ConfigureAwait(false);
283+
284+
if (result.IsCompleted || buffer.Length == 0)
265285
{
266-
throw;
286+
throw new EndOfStreamException("Pipe is completed.");
267287
}
288+
289+
buffer = result.Buffer;
268290
}
269291

270-
byte firstByte = frameHeaderBuffer[0];
271-
if (firstByte == 'A')
292+
reader.AdvanceTo(buffer.Start);
293+
return frame;
294+
}
295+
296+
internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, uint maxMessageSize, out InboundFrame frame)
297+
{
298+
if(buffer.Length < 7)
272299
{
273-
// Probably an AMQP protocol header, otherwise meaningless
274-
ProcessProtocolHeader(reader, frameHeaderBuffer.AsSpan(1, 6));
300+
frame = default;
301+
return false;
275302
}
276303

277-
FrameType type = (FrameType)firstByte;
278-
var frameHeaderSpan = new ReadOnlySpan<byte>(frameHeaderBuffer, 1, 6);
279-
int channel = NetworkOrderDeserializer.ReadUInt16(frameHeaderSpan);
280-
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderSpan.Slice(2, 4));
304+
FrameType type = (FrameType)buffer.First.Span[0];
305+
int channel = NetworkOrderDeserializer.ReadUInt16(buffer.Slice(1));
306+
int payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.Slice(3));
281307
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))
282308
{
283309
string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes";
@@ -287,46 +313,29 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, u
287313
const int EndMarkerLength = 1;
288314
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
289315
int readSize = payloadSize + EndMarkerLength;
290-
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
291-
try
292-
{
293-
ReadFromStream(reader, payloadBytes, readSize);
294-
}
295-
catch (Exception)
296-
{
297-
// Early EOF.
298-
ArrayPool<byte>.Shared.Return(payloadBytes);
299-
throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes");
300-
}
301316

302-
if (payloadBytes[payloadSize] != Constants.FrameEnd)
317+
if ((buffer.Length - 7) < readSize)
303318
{
304-
ArrayPool<byte>.Shared.Return(payloadBytes);
305-
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
319+
frame = default;
320+
return false;
306321
}
307-
308-
RabbitMqClientEventSource.Log.DataReceived(payloadSize + Framing.BaseFrameSize);
309-
return new InboundFrame(type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
310-
}
311-
312-
[MethodImpl(MethodImplOptions.AggressiveInlining)]
313-
private static void ReadFromStream(Stream reader, byte[] buffer, int toRead)
314-
{
315-
int bytesRead = 0;
316-
do
322+
else
317323
{
318-
int read = reader.Read(buffer, bytesRead, toRead - bytesRead);
319-
if (read == 0)
324+
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
325+
ReadOnlySequence<byte> framePayload = buffer.Slice(7, readSize);
326+
framePayload.CopyTo(payloadBytes);
327+
328+
if (payloadBytes[payloadSize] != Constants.FrameEnd)
320329
{
321-
ThrowEndOfStream();
330+
ArrayPool<byte>.Shared.Return(payloadBytes);
331+
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
322332
}
323333

324-
bytesRead += read;
325-
} while (bytesRead != toRead);
326-
327-
static void ThrowEndOfStream()
328-
{
329-
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
334+
RabbitMqClientEventSource.Log.DataReceived(payloadSize + Framing.BaseFrameSize);
335+
frame = new InboundFrame(type, channel, new Memory<byte>(payloadBytes, 0, payloadSize), payloadBytes);
336+
// Advance the buffer
337+
buffer = buffer.Slice(7 + readSize);
338+
return true;
330339
}
331340
}
332341

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Net;
34+
using System.Threading.Tasks;
3435

3536
namespace RabbitMQ.Client.Impl
3637
{
@@ -57,7 +58,7 @@ internal interface IFrameHandler
5758
///<summary>Read a frame from the underlying
5859
///transport. Returns null if the read operation timed out
5960
///(see Timeout property).</summary>
60-
InboundFrame ReadFrame();
61+
ValueTask<InboundFrame> ReadFrame();
6162

6263
void SendHeader();
6364

projects/RabbitMQ.Client/client/impl/Session.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
// Copyright (c) 2007-2020 VMware, Inc. All rights reserved.
3030
//---------------------------------------------------------------------------
3131

32+
using System.Diagnostics;
33+
3234
using RabbitMQ.Client.Framing.Impl;
3335

3436
namespace RabbitMQ.Client.Impl

0 commit comments

Comments
 (0)