Skip to content

Commit 6c3871e

Browse files
author
Stefán J. Sigurðarson
committed
Reducing thread usage for connections, merging the pipeline read loop into the MainLoop in the Connection, getting rid of those channels while at it.
1 parent 3488de4 commit 6c3871e

File tree

3 files changed

+63
-85
lines changed

3 files changed

+63
-85
lines changed

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
using System.Buffers;
3434
using System.Collections.Generic;
3535
using System.IO;
36+
using System.IO.Pipelines;
3637
using System.Net;
3738
using System.Net.Sockets;
3839
using System.Reflection;
@@ -342,10 +343,29 @@ public async ValueTask ClosingLoop()
342343
{
343344
_frameHandler.ReadTimeout = TimeSpan.Zero;
344345
// Wait for response/socket closure or timeout
346+
bool allowSync = true;
345347
while (!_closed)
346348
{
347-
await _frameHandler.FrameReader.WaitToReadAsync().ConfigureAwait(false);
348-
MainLoopIteration();
349+
// Let's read some bytes
350+
if (!(allowSync && _frameHandler.PipeReader.TryRead(out ReadResult readResult)))
351+
{
352+
readResult = await _frameHandler.PipeReader.ReadAsync().ConfigureAwait(false);
353+
}
354+
355+
int handled = 0;
356+
ReadOnlySequence<byte> buffer = readResult.Buffer;
357+
if (!buffer.IsEmpty)
358+
{
359+
handled = MainLoopIteration(ref buffer);
360+
}
361+
362+
allowSync = handled != 0;
363+
_frameHandler.PipeReader.AdvanceTo(buffer.Start, buffer.End);
364+
365+
if (handled == 0 && readResult.IsCompleted)
366+
{
367+
throw new EndOfStreamException();
368+
}
349369
}
350370
}
351371
catch (ObjectDisposedException ode)
@@ -484,17 +504,31 @@ public async Task MainLoop()
484504
bool shutdownCleanly = false;
485505
try
486506
{
507+
bool allowSync = true;
487508
while (_running)
488509
{
489-
ValueTask<bool> waitToRead = _frameHandler.FrameReader.WaitToReadAsync();
490-
if (!waitToRead.IsCompleted)
491-
{
492-
await waitToRead.ConfigureAwait(false);
493-
}
494-
495510
try
496511
{
497-
MainLoopIteration();
512+
// Let's read some bytes
513+
if (!(allowSync && _frameHandler.PipeReader.TryRead(out ReadResult readResult)))
514+
{
515+
readResult = await _frameHandler.PipeReader.ReadAsync().ConfigureAwait(false);
516+
}
517+
518+
int handled = 0;
519+
ReadOnlySequence<byte> buffer = readResult.Buffer;
520+
if (!buffer.IsEmpty)
521+
{
522+
handled = MainLoopIteration(ref buffer);
523+
}
524+
525+
allowSync = handled != 0;
526+
_frameHandler.PipeReader.AdvanceTo(buffer.Start, buffer.End);
527+
528+
if (handled == 0 && readResult.IsCompleted)
529+
{
530+
throw new EndOfStreamException();
531+
}
498532
}
499533
catch (SoftProtocolException spe)
500534
{
@@ -550,9 +584,10 @@ public async Task MainLoop()
550584
}
551585
}
552586

553-
public void MainLoopIteration()
587+
public int MainLoopIteration(ref ReadOnlySequence<byte> buffer)
554588
{
555-
while (_frameHandler.FrameReader.TryRead(out InboundFrame frame))
589+
int handled = 0;
590+
while (InboundFrame.TryReadFrame(ref buffer, out InboundFrame frame))
556591
{
557592
NotifyHeartbeatListener();
558593

@@ -597,7 +632,11 @@ public void MainLoopIteration()
597632
{
598633
frame.ReturnPayload();
599634
}
635+
636+
handled++;
600637
}
638+
639+
return handled;
601640
}
602641

603642
private void NotifyHeartbeatListener()

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33+
using System.IO.Pipelines;
3334
using System.Net;
34-
using System.Threading.Channels;
3535

3636
namespace RabbitMQ.Client.Impl
3737
{
@@ -53,7 +53,7 @@ internal interface IFrameHandler
5353
///<summary>Socket write timeout. System.Threading.Timeout.InfiniteTimeSpan signals "infinity".</summary>
5454
TimeSpan WriteTimeout { set; }
5555

56-
ChannelReader<InboundFrame> FrameReader { get; }
56+
PipeReader PipeReader { get; }
5757

5858
void Close();
5959

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 11 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Buffers;
3433
using System.IO;
3534
using System.IO.Pipelines;
3635
using System.Net;
@@ -71,12 +70,10 @@ internal class SocketFrameHandler : IFrameHandler
7170
private readonly ITcpClient _socket;
7271
private readonly ChannelWriter<OutgoingFrame> _channelWriter;
7372
private readonly ChannelReader<OutgoingFrame> _channelReader;
74-
private readonly ChannelWriter<InboundFrame> _frameWriter;
75-
public ChannelReader<InboundFrame> FrameReader { get; }
76-
private readonly PipeReader _pipeReader;
73+
public PipeReader PipeReader { get; private set; }
74+
7775
private readonly PipeWriter _pipeWriter;
7876
private readonly Task _writerTask;
79-
private readonly Task _readerTask;
8077
private readonly object _semaphore = new object();
8178
private readonly byte[] _frameHeaderBuffer;
8279
private readonly IMeasuredDuplexPipe _pipe;
@@ -94,18 +91,8 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func<AddressFamily, ITcpClie
9491
SingleWriter = false
9592
});
9693

97-
var frameChannel = Channel.CreateUnbounded<InboundFrame>(
98-
new UnboundedChannelOptions
99-
{
100-
AllowSynchronousContinuations = true,
101-
SingleReader = true,
102-
SingleWriter = true
103-
});
104-
10594
_channelReader = channel.Reader;
10695
_channelWriter = channel.Writer;
107-
_frameWriter = frameChannel.Writer;
108-
FrameReader = frameChannel.Reader;
10996

11097
// Resolve the hostname to know if it's even possible to even try IPv6
11198
IPAddress[] adds = Dns.GetHostAddresses(endpoint.HostName);
@@ -162,12 +149,11 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint, Func<AddressFamily, ITcpClie
162149
_pipe = SocketConnection.Create(_socket.Client);
163150
}
164151

165-
_pipeReader = _pipe.Input;
152+
PipeReader = _pipe.Input;
166153
_pipeWriter = _pipe.Output;
167154

168155
WriteTimeout = writeTimeout;
169156
_writerTask = Task.Run(WriteLoop, CancellationToken.None);
170-
_readerTask = Task.Run(ReadLoop, CancellationToken.None);
171157
}
172158
public AmqpTcpEndpoint Endpoint { get; set; }
173159

@@ -226,7 +212,6 @@ public void Close()
226212
if (!_closed)
227213
{
228214
_channelWriter.TryComplete();
229-
_frameWriter.TryComplete();
230215
try { _writerTask.GetAwaiter().GetResult(); } catch { }
231216

232217
if (_pipe != null)
@@ -246,7 +231,7 @@ public void Close()
246231

247232
public void SendHeader()
248233
{
249-
Span<byte> headerBytes = stackalloc byte[8];
234+
Span<byte> headerBytes = _pipeWriter.GetSpan(8);
250235
headerBytes[0] = (byte)'A';
251236
headerBytes[1] = (byte)'M';
252237
headerBytes[2] = (byte)'Q';
@@ -266,64 +251,18 @@ public void SendHeader()
266251
headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion;
267252
}
268253

269-
Memory<byte> pipeMemory = _pipeWriter.GetMemory(8);
270-
headerBytes.CopyTo(pipeMemory.Span);
271-
_pipeWriter.Advance(headerBytes.Length);
272-
_pipeWriter.FlushAsync().AsTask().GetAwaiter().GetResult();
273-
}
274-
275-
public void Write(OutgoingFrame frame)
276-
{
277-
_channelWriter.TryWrite(frame);
278-
}
279-
280-
private async Task ReadLoop()
281-
{
282-
bool allowSync = true;
283-
try
254+
_pipeWriter.Advance(8);
255+
if (!_pipeWriter.FlushAsync().AsTask().Wait(_writeableStateTimeout))
284256
{
285-
while (true)
286-
{
287-
// Let's read some bytes
288-
if (!(allowSync && _pipeReader.TryRead(out ReadResult readResult)))
289-
{
290-
readResult = await _pipeReader.ReadAsync().ConfigureAwait(false);
291-
}
292-
293-
int handled = 0;
294-
ReadOnlySequence<byte> buffer = readResult.Buffer;
295-
if (!buffer.IsEmpty)
296-
{
297-
handled = ProcessBuffer(ref buffer);
298-
}
299-
300-
allowSync = handled != 0;
301-
_pipeReader.AdvanceTo(buffer.Start, buffer.End);
302-
303-
if (handled == 0 && readResult.IsCompleted)
304-
{
305-
break;
306-
}
307-
}
257+
var timeout = new TimeoutException();
258+
_pipeWriter.Complete(timeout);
259+
_channelWriter.Complete(timeout);
308260
}
309-
catch (Exception e)
310-
{
311-
_frameWriter.TryComplete(e);
312-
}
313-
314-
_frameWriter.TryComplete(new EndOfStreamException());
315261
}
316262

317-
private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
263+
public void Write(OutgoingFrame frame)
318264
{
319-
int handled = 0;
320-
while (InboundFrame.TryReadFrame(ref buffer, out InboundFrame frame))
321-
{
322-
_frameWriter.TryWrite(frame);
323-
handled++;
324-
}
325-
326-
return handled;
265+
_channelWriter.TryWrite(frame);
327266
}
328267

329268
private async Task WriteLoop()

0 commit comments

Comments
 (0)