Skip to content

Commit 8c8a399

Browse files
author
Stefán J. Sigurðarson
committed
Further improvements
1 parent 3af97ef commit 8c8a399

File tree

7 files changed

+323
-204
lines changed

7 files changed

+323
-204
lines changed

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 58 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.Buffers;
3334
using System.Collections.Generic;
3435
using System.IO;
3536
using System.Net;
@@ -70,7 +71,6 @@ internal sealed class Connection : IConnection
7071
private volatile bool _running = true;
7172
private readonly MainSession _session0;
7273
private SessionManager _sessionManager;
73-
internal SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
7474

7575
//
7676
// Heartbeats
@@ -94,8 +94,6 @@ internal sealed class Connection : IConnection
9494
// errors, otherwise as read timeouts
9595
public ConsumerWorkService ConsumerWorkService { get; }
9696

97-
internal IFrameHandler FrameHandler => _frameHandler;
98-
9997
public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, string clientProvidedName = null)
10098
{
10199
ClientProvidedName = clientProvidedName;
@@ -346,7 +344,8 @@ public async ValueTask ClosingLoop()
346344
// Wait for response/socket closure or timeout
347345
while (!_closed)
348346
{
349-
await MainLoopIteration().ConfigureAwait(false);
347+
await _frameHandler.FrameReader.WaitToReadAsync().ConfigureAwait(false);
348+
MainLoopIteration();
350349
}
351350
}
352351
catch (ObjectDisposedException ode)
@@ -487,9 +486,15 @@ public async Task MainLoop()
487486
{
488487
while (_running)
489488
{
489+
ValueTask<bool> waitToRead = _frameHandler.FrameReader.WaitToReadAsync();
490+
if (!waitToRead.IsCompleted)
491+
{
492+
await waitToRead.ConfigureAwait(false);
493+
}
494+
490495
try
491496
{
492-
await MainLoopIteration().ConfigureAwait(false);
497+
MainLoopIteration();
493498
}
494499
catch (SoftProtocolException spe)
495500
{
@@ -545,51 +550,53 @@ public async Task MainLoop()
545550
}
546551
}
547552

548-
public async ValueTask MainLoopIteration()
553+
public void MainLoopIteration()
549554
{
550-
InboundFrame frame = await _frameHandler.ReadFrame().ConfigureAwait(false);
551-
NotifyHeartbeatListener();
552-
553-
bool shallReturn = true;
554-
// We have received an actual frame.
555-
if (frame.Type == FrameType.FrameHeartbeat)
556-
{
557-
// Ignore it: we've already just reset the heartbeat
558-
}
559-
else if (frame.Channel == 0)
560-
{
561-
// In theory, we could get non-connection.close-ok
562-
// frames here while we're quiescing (m_closeReason !=
563-
// null). In practice, there's a limited number of
564-
// things the server can ask of us on channel 0 -
565-
// essentially, just connection.close. That, combined
566-
// with the restrictions on pipelining, mean that
567-
// we're OK here to handle channel 0 traffic in a
568-
// quiescing situation, even though technically we
569-
// should be ignoring everything except
570-
// connection.close-ok.
571-
shallReturn = _session0.HandleFrame(in frame);
572-
}
573-
else
555+
while (_frameHandler.FrameReader.TryRead(out InboundFrame frame))
574556
{
575-
// If we're still m_running, but have a m_closeReason,
576-
// then we must be quiescing, which means any inbound
577-
// frames for non-zero channels (and any inbound
578-
// commands on channel zero that aren't
579-
// Connection.CloseOk) must be discarded.
580-
if (_closeReason is null)
557+
NotifyHeartbeatListener();
558+
559+
bool shallReturn = true;
560+
// We have received an actual frame.
561+
if (frame.Type == FrameType.FrameHeartbeat)
581562
{
582-
// No close reason, not quiescing the
583-
// connection. Handle the frame. (Of course, the
584-
// Session itself may be quiescing this particular
585-
// channel, but that's none of our concern.)
586-
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
563+
// Ignore it: we've already just reset the heartbeat
564+
}
565+
else if (frame.Channel == 0)
566+
{
567+
// In theory, we could get non-connection.close-ok
568+
// frames here while we're quiescing (m_closeReason !=
569+
// null). In practice, there's a limited number of
570+
// things the server can ask of us on channel 0 -
571+
// essentially, just connection.close. That, combined
572+
// with the restrictions on pipelining, mean that
573+
// we're OK here to handle channel 0 traffic in a
574+
// quiescing situation, even though technically we
575+
// should be ignoring everything except
576+
// connection.close-ok.
577+
shallReturn = _session0.HandleFrame(in frame);
578+
}
579+
else
580+
{
581+
// If we're still m_running, but have a m_closeReason,
582+
// then we must be quiescing, which means any inbound
583+
// frames for non-zero channels (and any inbound
584+
// commands on channel zero that aren't
585+
// Connection.CloseOk) must be discarded.
586+
if (_closeReason is null)
587+
{
588+
// No close reason, not quiescing the
589+
// connection. Handle the frame. (Of course, the
590+
// Session itself may be quiescing this particular
591+
// channel, but that's none of our concern.)
592+
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
593+
}
587594
}
588-
}
589595

590-
if (shallReturn)
591-
{
592-
frame.ReturnPayload();
596+
if (shallReturn)
597+
{
598+
frame.ReturnPayload();
599+
}
593600
}
594601
}
595602

@@ -877,12 +884,7 @@ public void HeartbeatWriteTimerCallback(object state)
877884
{
878885
if (!_closed)
879886
{
880-
_writeLock.Wait();
881-
Span<byte> span = _frameHandler.PipeWriter.GetSpan(Client.Impl.Framing.Heartbeat.Payload.Length);
882-
Client.Impl.Framing.Heartbeat.Payload.CopyTo(span);
883-
_frameHandler.PipeWriter.Advance(Client.Impl.Framing.Heartbeat.Payload.Length);
884-
_frameHandler.Flush();
885-
_writeLock.Release();
887+
Write(OutgoingFrame.CreateHeartbeat());
886888
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
887889
}
888890
}
@@ -919,6 +921,11 @@ public override string ToString()
919921
return string.Format("Connection({0},{1})", _id, Endpoint);
920922
}
921923

924+
public void Write(OutgoingFrame outgoingFrame)
925+
{
926+
_frameHandler.Write(outgoingFrame);
927+
}
928+
922929
public void UpdateSecret(string newSecret, string reason)
923930
{
924931
_model0.UpdateSecret(newSecret, reason);

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 64 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,7 @@
3131

3232
using System;
3333
using System.Buffers;
34-
using System.IO;
35-
using System.IO.Pipelines;
36-
using System.Net.Sockets;
3734
using System.Runtime.CompilerServices;
38-
using System.Runtime.ExceptionServices;
39-
using System.Threading.Tasks;
4035

4136
using RabbitMQ.Client.Exceptions;
4237
using RabbitMQ.Util;
@@ -67,6 +62,16 @@ private static int WriteBaseFrame(Span<byte> span, FrameType type, ushort channe
6762
return StartPayload + 1 + payloadLength;
6863
}
6964

65+
internal static class Heartbeat
66+
{
67+
public const int FrameSize = BaseFrameSize;
68+
69+
public static int WriteTo(Span<byte> span)
70+
{
71+
return WriteBaseFrame(span, FrameType.FrameHeartbeat, 0, 0);
72+
}
73+
}
74+
7075
internal static class Method
7176
{
7277
/* +----------+-----------+-----------+
@@ -130,23 +135,6 @@ public static int WriteTo(Span<byte> span, ushort channel, ReadOnlySpan<byte> bo
130135
return WriteBaseFrame(span, FrameType.FrameBody, channel, StartBodyArgument - StartPayload + body.Length);
131136
}
132137
}
133-
134-
internal static class Heartbeat
135-
{
136-
/* Empty frame */
137-
public const int FrameSize = BaseFrameSize;
138-
139-
/// <summary>
140-
/// Compiler trick to directly refer to static data in the assembly, see here: https://github.com/dotnet/roslyn/pull/24621
141-
/// </summary>
142-
internal static ReadOnlySpan<byte> Payload => new byte[]
143-
{
144-
Constants.FrameHeartbeat,
145-
0, 0, // channel
146-
0, 0, 0, 0, // payload length
147-
Constants.FrameEnd
148-
};
149-
}
150138
}
151139

152140
internal readonly struct InboundFrame
@@ -164,96 +152,88 @@ private InboundFrame(FrameType type, int channel, ReadOnlyMemory<byte> payload,
164152
_rentedArray = rentedArray;
165153
}
166154

167-
private static async ValueTask ProcessProtocolHeader(PipeReader reader, ReadOnlyMemory<byte> protocolError)
155+
private static void ProcessProtocolHeader(ReadOnlySpan<byte> protocolError)
168156
{
169-
try
170-
{
171-
byte b1 = protocolError.Span[0];
172-
byte b2 = protocolError.Span[1];
173-
byte b3 = protocolError.Span[2];
174-
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
175-
{
176-
throw new MalformedFrameException("Invalid AMQP protocol header from server");
177-
}
178-
179-
int transportHigh = protocolError.Span[3];
180-
int transportLow = protocolError.Span[4];
181-
int serverMajor = protocolError.Span[5];
182-
int serverMinor = await reader.ReadByteAsync().ConfigureAwait(false); ;
183-
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
184-
}
185-
catch (EndOfStreamException)
157+
byte b1 = protocolError[0];
158+
byte b2 = protocolError[1];
159+
byte b3 = protocolError[2];
160+
if (b1 != 'M' || b2 != 'Q' || b3 != 'P')
186161
{
187-
// Ideally we'd wrap the EndOfStreamException in the
188-
// MalformedFrameException, but unfortunately the
189-
// design of MalformedFrameException's superclass,
190-
// ProtocolViolationException, doesn't permit
191-
// this. Fortunately, the call stack in the
192-
// EndOfStreamException is largely irrelevant at this
193-
// point, so can safely be ignored.
194162
throw new MalformedFrameException("Invalid AMQP protocol header from server");
195163
}
164+
165+
int transportHigh = protocolError[3];
166+
int transportLow = protocolError[4];
167+
int serverMajor = protocolError[5];
168+
int serverMinor = protocolError[6];
169+
throw new PacketNotRecognizedException(transportHigh, transportLow, serverMajor, serverMinor);
196170
}
197171

198-
internal static async ValueTask<InboundFrame> ReadFrom(PipeReader reader, Memory<byte> frameHeaderBuffer)
172+
internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, out InboundFrame frame)
199173
{
200-
// We'll always need to read at least 7 bytes (type + channel + payloadSize) or (type + first 6 bytes of protocol error, see ProcessProtocolHeader).
201-
int type;
202-
try
174+
// We'll always need to read at least 8 bytes (type (1) + channel (2) + payloadSize (4) + end marker (1)) or (8 bytes of protocol error, see ProcessProtocolHeader).
175+
if (buffer.Length < 8)
203176
{
204-
await reader.ReadAsync(frameHeaderBuffer.Slice(0, 7)).ConfigureAwait(false);
205-
206-
type = frameHeaderBuffer.Span[0];
177+
frame = default;
178+
return false;
207179
}
208-
catch (IOException ioe)
180+
181+
if (buffer.First.Span[0] == 'A')
209182
{
210-
// If it's a WSAETIMEDOUT SocketException, unwrap it.
211-
// This might happen when the limit of half-open connections is
212-
// reached.
213-
if (ioe.InnerException is SocketException exception && exception.SocketErrorCode == SocketError.TimedOut)
183+
// Probably an AMQP protocol header, otherwise meaningless
184+
if (buffer.First.Length >= 8)
214185
{
215-
ExceptionDispatchInfo.Capture(exception).Throw();
186+
ProcessProtocolHeader(buffer.First.Span.Slice(1, 7));
187+
}
188+
else
189+
{
190+
Span<byte> protocolError = stackalloc byte[7];
191+
buffer.Slice(1, 7).CopyTo(protocolError);
192+
ProcessProtocolHeader(protocolError);
216193
}
217-
218-
throw;
219194
}
220195

221-
switch (type)
196+
int type = buffer.First.Span[0];
197+
int channel;
198+
int payloadSize;
199+
200+
if (buffer.First.Length >= 7)
222201
{
223-
case -1:
224-
throw new EndOfStreamException("Reached the end of the stream. Possible authentication failure.");
225-
case 'A':
226-
// Probably an AMQP protocol header, otherwise meaningless
227-
await ProcessProtocolHeader(reader, frameHeaderBuffer.Slice(1, 6)).ConfigureAwait(false);
228-
break;
202+
channel = NetworkOrderDeserializer.ReadUInt16(buffer.First.Span.Slice(1, 2));
203+
payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.First.Span.Slice(3, 4)); // FIXME - throw exn on unreasonable value
204+
}
205+
else
206+
{
207+
Span<byte> headerBytes = stackalloc byte[6];
208+
buffer.Slice(1, 6).CopyTo(headerBytes);
209+
channel = NetworkOrderDeserializer.ReadUInt16(headerBytes.Slice(0, 2));
210+
payloadSize = NetworkOrderDeserializer.ReadInt32(headerBytes.Slice(2, 4)); // FIXME - throw exn on unreasonable value
229211
}
230-
231-
int channel = NetworkOrderDeserializer.ReadUInt16(frameHeaderBuffer.Span.Slice(1, 2));
232-
int payloadSize = NetworkOrderDeserializer.ReadInt32(frameHeaderBuffer.Span.Slice(3, 4)); // FIXME - throw exn on unreasonable value
233212

234213
const int EndMarkerLength = 1;
235-
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
236214
int readSize = payloadSize + EndMarkerLength;
237-
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
238-
Memory<byte> payloadMemory = payloadBytes.AsMemory(0, readSize);
239-
try
240-
{
241-
await reader.ReadAsync(payloadMemory).ConfigureAwait(false);
242-
}
243-
catch (Exception e)
215+
216+
// Do we have enough bytes to read an entire frame (type + channel + payloadSize + payload + end marker)
217+
if (buffer.Length < (7 + readSize))
244218
{
245-
// Early EOF.
246-
ArrayPool<byte>.Shared.Return(payloadBytes);
247-
throw new MalformedFrameException($"Short frame - expected to read {readSize} bytes: {e.Message}");
219+
frame = default;
220+
return false;
248221
}
249222

223+
// Is returned by InboundFrame.ReturnPayload in Connection.MainLoopIteration
224+
byte[] payloadBytes = ArrayPool<byte>.Shared.Rent(readSize);
225+
Memory<byte> payloadMemory = payloadBytes.AsMemory(0, readSize);
226+
ReadOnlySequence<byte> payloadSlice = buffer.Slice(7, readSize);
227+
payloadSlice.CopyTo(payloadMemory.Span);
250228
if (payloadBytes[payloadSize] != Constants.FrameEnd)
251229
{
252230
ArrayPool<byte>.Shared.Return(payloadBytes);
253231
throw new MalformedFrameException($"Bad frame end marker: {payloadBytes[payloadSize]}");
254232
}
255233

256-
return new InboundFrame((FrameType)type, channel, payloadMemory.Slice(0, payloadSize), payloadBytes);
234+
buffer = buffer.Slice(payloadSlice.End);
235+
frame = new InboundFrame((FrameType)type, channel, payloadMemory.Slice(0, payloadSize), payloadBytes);
236+
return true;
257237
}
258238

259239
public byte[] TakeoverPayload()

0 commit comments

Comments
 (0)