Skip to content

Commit 05a11c3

Browse files
committed
Adding sync TryReadFrame method.
Make sure read/write tasks for connections run on dedicated threads.
1 parent 4dd2eae commit 05a11c3

File tree

5 files changed

+99
-58
lines changed

5 files changed

+99
-58
lines changed

projects/RabbitMQ.Client/client/impl/Connection.Receive.cs

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -70,53 +70,64 @@ private async Task ReceiveLoop()
7070
{
7171
while (!_closed)
7272
{
73-
InboundFrame frame = await _frameHandler.ReadFrame().ConfigureAwait(false);
73+
while (_frameHandler.TryReadFrame(out InboundFrame frame))
74+
{
75+
NotifyHeartbeatListener();
76+
ProcessFrame(frame);
77+
}
78+
79+
// Done reading frames synchronously, go async
80+
InboundFrame asyncFrame = await _frameHandler.ReadFrameAsync().ConfigureAwait(false);
7481
NotifyHeartbeatListener();
82+
ProcessFrame(asyncFrame);
83+
}
84+
}
7585

76-
bool shallReturn = true;
77-
if (frame.Channel == 0)
86+
private void ProcessFrame(InboundFrame frame)
87+
{
88+
bool shallReturn = true;
89+
if (frame.Channel == 0)
90+
{
91+
if (frame.Type == FrameType.FrameHeartbeat)
7892
{
79-
if (frame.Type == FrameType.FrameHeartbeat)
80-
{
81-
// Ignore it: we've already just reset the heartbeat
82-
}
83-
else
84-
{
85-
// In theory, we could get non-connection.close-ok
86-
// frames here while we're quiescing (m_closeReason !=
87-
// null). In practice, there's a limited number of
88-
// things the server can ask of us on channel 0 -
89-
// essentially, just connection.close. That, combined
90-
// with the restrictions on pipelining, mean that
91-
// we're OK here to handle channel 0 traffic in a
92-
// quiescing situation, even though technically we
93-
// should be ignoring everything except
94-
// connection.close-ok.
95-
shallReturn = _session0.HandleFrame(in frame);
96-
}
93+
// Ignore it: we've already just reset the heartbeat
9794
}
9895
else
9996
{
100-
// If we're still m_running, but have a m_closeReason,
101-
// then we must be quiescing, which means any inbound
102-
// frames for non-zero channels (and any inbound
103-
// commands on channel zero that aren't
104-
// Connection.CloseOk) must be discarded.
105-
if (_closeReason is null)
106-
{
107-
// No close reason, not quiescing the
108-
// connection. Handle the frame. (Of course, the
109-
// Session itself may be quiescing this particular
110-
// channel, but that's none of our concern.)
111-
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
112-
}
97+
// In theory, we could get non-connection.close-ok
98+
// frames here while we're quiescing (m_closeReason !=
99+
// null). In practice, there's a limited number of
100+
// things the server can ask of us on channel 0 -
101+
// essentially, just connection.close. That, combined
102+
// with the restrictions on pipelining, mean that
103+
// we're OK here to handle channel 0 traffic in a
104+
// quiescing situation, even though technically we
105+
// should be ignoring everything except
106+
// connection.close-ok.
107+
shallReturn = _session0.HandleFrame(in frame);
113108
}
114-
115-
if (shallReturn)
109+
}
110+
else
111+
{
112+
// If we're still m_running, but have a m_closeReason,
113+
// then we must be quiescing, which means any inbound
114+
// frames for non-zero channels (and any inbound
115+
// commands on channel zero that aren't
116+
// Connection.CloseOk) must be discarded.
117+
if (_closeReason is null)
116118
{
117-
frame.ReturnPayload();
119+
// No close reason, not quiescing the
120+
// connection. Handle the frame. (Of course, the
121+
// Session itself may be quiescing this particular
122+
// channel, but that's none of our concern.)
123+
shallReturn = _sessionManager.Lookup(frame.Channel).HandleFrame(in frame);
118124
}
119125
}
126+
127+
if (shallReturn)
128+
{
129+
frame.ReturnPayload();
130+
}
120131
}
121132

122133
///<remarks>

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public Connection(ConnectionConfig config, IFrameHandler frameHandler)
7979
["connection_name"] = ClientProvidedName
8080
};
8181

82-
_mainLoopTask = Task.Run(MainLoop);
82+
_mainLoopTask = Task.Factory.StartNew(MainLoop, TaskCreationOptions.LongRunning).Unwrap();
8383
try
8484
{
8585
Open();

projects/RabbitMQ.Client/client/impl/Frame.cs

Lines changed: 36 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -251,34 +251,23 @@ private static void ProcessProtocolHeader(ReadOnlySequence<byte> buffer)
251251
}
252252
}
253253

254-
internal static async ValueTask<InboundFrame> ReadFromPipe(PipeReader reader, uint maxMessageSize)
254+
internal static async ValueTask<InboundFrame> ReadFromPipeAsync(PipeReader reader, uint maxMessageSize)
255255
{
256-
// Try a synchronous read first, then go async
257-
if (!reader.TryRead(out ReadResult result))
258-
{
259-
result = await reader.ReadAsync().ConfigureAwait(false);
260-
}
256+
ReadResult result = await reader.ReadAsync().ConfigureAwait(false);
261257

262258
ReadOnlySequence<byte> buffer = result.Buffer;
263-
264259
if (result.IsCompleted || buffer.Length == 0)
265260
{
266261
throw new EndOfStreamException("Pipe is completed.");
267262
}
268263

269-
byte firstByte = buffer.First.Span[0];
270-
if (firstByte == 'A')
271-
{
272-
ProcessProtocolHeader(buffer);
273-
}
274-
275264
InboundFrame frame;
276-
265+
// Loop until we have enough data to read an entire frame, or until the pipe is completed.
277266
while (!TryReadFrame(ref buffer, maxMessageSize, out frame))
278267
{
279268
reader.AdvanceTo(buffer.Start, buffer.End);
280269

281-
// No need to try a synchronous read since we have an incomplete frame anyway, so we'll always need to go async
270+
// Not enough
282271
result = await reader.ReadAsync().ConfigureAwait(false);
283272

284273
if (result.IsCompleted || buffer.Length == 0)
@@ -293,6 +282,31 @@ internal static async ValueTask<InboundFrame> ReadFromPipe(PipeReader reader, ui
293282
return frame;
294283
}
295284

285+
internal static bool TryReadFrameFromPipe(PipeReader reader, uint maxMessageSize, out InboundFrame frame)
286+
{
287+
if (reader.TryRead(out ReadResult result))
288+
{
289+
ReadOnlySequence<byte> buffer = result.Buffer;
290+
if (result.IsCompleted || buffer.Length == 0)
291+
{
292+
throw new EndOfStreamException("Pipe is completed.");
293+
}
294+
295+
if (TryReadFrame(ref buffer, maxMessageSize, out frame))
296+
{
297+
reader.AdvanceTo(buffer.Start);
298+
return true;
299+
}
300+
301+
// We didn't read enough, so let's signal how much of the buffer we examined.
302+
reader.AdvanceTo(buffer.Start, buffer.End);
303+
}
304+
305+
// Failed to synchronously read sufficient data from the pipe. We'll need to go async.
306+
frame = default;
307+
return false;
308+
}
309+
296310
internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, uint maxMessageSize, out InboundFrame frame)
297311
{
298312
if (buffer.Length < 7)
@@ -301,7 +315,13 @@ internal static bool TryReadFrame(ref ReadOnlySequence<byte> buffer, uint maxMes
301315
return false;
302316
}
303317

304-
FrameType type = (FrameType)buffer.First.Span[0];
318+
byte firstByte = buffer.First.Span[0];
319+
if (firstByte == 'A')
320+
{
321+
ProcessProtocolHeader(buffer);
322+
}
323+
324+
FrameType type = (FrameType)firstByte;
305325
int channel = NetworkOrderDeserializer.ReadUInt16(buffer.Slice(1));
306326
int payloadSize = NetworkOrderDeserializer.ReadInt32(buffer.Slice(3));
307327
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,12 @@ internal interface IFrameHandler
5858
///<summary>Read a frame from the underlying
5959
///transport. Returns null if the read operation timed out
6060
///(see Timeout property).</summary>
61-
ValueTask<InboundFrame> ReadFrame();
61+
ValueTask<InboundFrame> ReadFrameAsync();
62+
63+
///<summary>Try to synchronously read a frame from the underlying transport.
64+
///Returns false if connection buffer contains insufficient data.
65+
///</summary>
66+
bool TryReadFrame(out InboundFrame frame);
6267

6368
void SendHeader();
6469

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
148148
_pipeReader = PipeReader.Create(netstream);
149149

150150
WriteTimeout = writeTimeout;
151-
_writerTask = Task.Run(WriteLoop);
151+
_writerTask = Task.Factory.StartNew(WriteLoop, TaskCreationOptions.LongRunning).Unwrap();
152152
}
153153

154154
public AmqpTcpEndpoint Endpoint
@@ -238,9 +238,14 @@ public void Close()
238238
}
239239
}
240240

241-
public ValueTask<InboundFrame> ReadFrame()
241+
public ValueTask<InboundFrame> ReadFrameAsync()
242242
{
243-
return InboundFrame.ReadFromPipe(_pipeReader, _amqpTcpEndpoint.MaxMessageSize);
243+
return InboundFrame.ReadFromPipeAsync(_pipeReader, _amqpTcpEndpoint.MaxMessageSize);
244+
}
245+
246+
public bool TryReadFrame(out InboundFrame frame)
247+
{
248+
return InboundFrame.TryReadFrameFromPipe(_pipeReader, _amqpTcpEndpoint.MaxMessageSize, out frame);
244249
}
245250

246251
public void SendHeader()

0 commit comments

Comments
 (0)