Skip to content

Commit 87d6f0a

Browse files
committed
Removing unnecessary List creation when splitting a command into frames.
Using a Channel to buffer outbound frames, moving socket writes to a background task and getting rid of the streamlock since there is only ever one writer accessing it.
1 parent ac5e2ab commit 87d6f0a

File tree

6 files changed

+40
-92
lines changed

6 files changed

+40
-92
lines changed

_site

Submodule _site updated 161 files

projects/RabbitMQ.Client/client/impl/Command.cs

Lines changed: 2 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -98,64 +98,20 @@ public static void CheckEmptyFrameSize()
9898
}
9999

100100
internal void Transmit(int channelNumber, Connection connection)
101-
{
102-
if (Method.HasContent)
103-
{
104-
TransmitAsFrameSet(channelNumber, connection);
105-
}
106-
else
107-
{
108-
TransmitAsSingleFrame(channelNumber, connection);
109-
}
110-
}
111-
112-
internal void TransmitAsSingleFrame(int channelNumber, Connection connection)
113101
{
114102
connection.WriteFrame(new MethodOutboundFrame(channelNumber, Method));
115-
}
116-
117-
internal void TransmitAsFrameSet(int channelNumber, Connection connection)
118-
{
119-
var frames = new List<OutboundFrame> { new MethodOutboundFrame(channelNumber, Method) };
120103
if (Method.HasContent)
121104
{
122-
frames.Add(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
105+
connection.WriteFrame(new HeaderOutboundFrame(channelNumber, Header, Body.Length));
123106
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
124107
int bodyPayloadMax = (frameMax == 0) ? Body.Length : frameMax - EmptyFrameSize;
125108
for (int offset = 0; offset < Body.Length; offset += bodyPayloadMax)
126109
{
127110
int remaining = Body.Length - offset;
128111
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
129-
frames.Add(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
112+
connection.WriteFrame(new BodySegmentOutboundFrame(channelNumber, Body.Slice(offset, count)));
130113
}
131114
}
132-
133-
connection.WriteFrameSet(frames);
134-
}
135-
136-
137-
internal static List<OutboundFrame> CalculateFrames(int channelNumber, Connection connection, IList<Command> commands)
138-
{
139-
var frames = new List<OutboundFrame>();
140-
141-
foreach (Command cmd in commands)
142-
{
143-
frames.Add(new MethodOutboundFrame(channelNumber, cmd.Method));
144-
if (cmd.Method.HasContent)
145-
{
146-
frames.Add(new HeaderOutboundFrame(channelNumber, cmd.Header, cmd.Body.Length));
147-
int frameMax = (int)Math.Min(int.MaxValue, connection.FrameMax);
148-
int bodyPayloadMax = (frameMax == 0) ? cmd.Body.Length : frameMax - EmptyFrameSize;
149-
for (int offset = 0; offset < cmd.Body.Length; offset += bodyPayloadMax)
150-
{
151-
int remaining = cmd.Body.Length - offset;
152-
int count = (remaining < bodyPayloadMax) ? remaining : bodyPayloadMax;
153-
frames.Add(new BodySegmentOutboundFrame(channelNumber, cmd.Body.Slice(offset, count)));
154-
}
155-
}
156-
}
157-
158-
return frames;
159115
}
160116

161117
public void Dispose()

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -948,11 +948,6 @@ public void WriteFrame(OutboundFrame f)
948948
_frameHandler.WriteFrame(f);
949949
}
950950

951-
public void WriteFrameSet(IList<OutboundFrame> f)
952-
{
953-
_frameHandler.WriteFrameSet(f);
954-
}
955-
956951
public void UpdateSecret(string newSecret, string reason)
957952
{
958953
_model0.UpdateSecret(newSecret, reason);
@@ -1010,7 +1005,7 @@ public IModel CreateModel()
10101005
{
10111006
EnsureIsOpen();
10121007
ISession session = CreateSession();
1013-
var model = (IFullModel)Protocol.CreateModel(session, ConsumerWorkService);
1008+
var model = (IFullModel)Protocol.CreateModel(session);
10141009
model.ContinuationTimeout = _factory.ContinuationTimeout;
10151010
model._Private_ChannelOpen("");
10161011
return model;

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ interface IFrameHandler
7171

7272
void SendHeader();
7373

74-
void WriteFrame(OutboundFrame frame, bool flush = true);
75-
76-
void WriteFrameSet(IList<OutboundFrame> frames);
74+
void WriteFrame(OutboundFrame frame);
7775
}
7876
}

projects/RabbitMQ.Client/client/impl/SessionBase.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,14 @@ public virtual void Transmit(Command cmd)
191191
// of frames within a channel. But that is fixed in socket frame handler instead, so no need to lock.
192192
cmd.Transmit(ChannelNumber, Connection);
193193
}
194+
194195
public virtual void Transmit(IList<Command> commands)
195196
{
196-
Connection.WriteFrameSet(Command.CalculateFrames(ChannelNumber, Connection, commands));
197+
for (int i = 0; i < commands.Count; i++)
198+
{
199+
Command command = commands[i];
200+
command.Transmit(ChannelNumber, Connection);
201+
}
197202
}
198203
}
199204
}

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 29 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
using System.Net.Sockets;
4747
using System.Runtime.InteropServices;
4848
using System.Text;
49+
using System.Threading.Channels;
4950
using System.Threading.Tasks;
5051

5152
using RabbitMQ.Client.Exceptions;
@@ -79,7 +80,8 @@ class SocketFrameHandler : IFrameHandler
7980
private readonly ITcpClient _socket;
8081
private readonly Stream _writer;
8182
private readonly object _semaphore = new object();
82-
private readonly object _streamLock = new object();
83+
private readonly Channel<OutboundFrame> _frameChannel = Channel.CreateUnbounded<OutboundFrame>(new UnboundedChannelOptions { AllowSynchronousContinuations = false, SingleReader = true, SingleWriter = false });
84+
private Task _frameWriter;
8385
private bool _closed;
8486
public SocketFrameHandler(AmqpTcpEndpoint endpoint,
8587
Func<AddressFamily, ITcpClient> socketFactory,
@@ -124,6 +126,7 @@ public SocketFrameHandler(AmqpTcpEndpoint endpoint,
124126
_writer = new BufferedStream(netstream, _socket.Client.SendBufferSize);
125127

126128
WriteTimeout = writeTimeout;
129+
_frameWriter = Task.Run(WriteFrameImpl);
127130
}
128131
public AmqpTcpEndpoint Endpoint { get; set; }
129132

@@ -181,6 +184,15 @@ public void Close()
181184
{
182185
if (!_closed)
183186
{
187+
try
188+
{
189+
_frameChannel.Writer.Complete();
190+
_frameWriter.Wait();
191+
}
192+
catch(Exception)
193+
{
194+
}
195+
184196
try
185197
{
186198
_socket.Close();
@@ -222,49 +234,31 @@ public void SendHeader()
222234
headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion;
223235
}
224236

225-
Write(new ArraySegment<byte>(headerBytes), true);
237+
_writer.Write(headerBytes, 0, 8);
238+
_writer.Flush();
226239
}
227240

228-
public void WriteFrame(OutboundFrame frame, bool flush = true)
241+
public void WriteFrame(OutboundFrame frame)
229242
{
230-
int bufferSize = frame.GetMinimumBufferSize();
231-
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
232-
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
233-
frame.WriteTo(slice);
234-
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
235-
Write(slice.Slice(0, frame.ByteCount), flush);
236-
ArrayPool<byte>.Shared.Return(memoryArray);
237-
return;
238-
239-
throw new InvalidOperationException("Unable to get array segment from memory.");
243+
_frameChannel.Writer.TryWrite(frame);
240244
}
241245

242-
public void WriteFrameSet(IList<OutboundFrame> frames)
246+
public async Task WriteFrameImpl()
243247
{
244-
for (int i = 0; i < frames.Count; i++)
248+
while (await _frameChannel.Reader.WaitToReadAsync().ConfigureAwait(false))
245249
{
246-
WriteFrame(frames[i], false);
247-
}
248-
249-
lock (_streamLock)
250-
{
251-
_writer.Flush();
252-
}
253-
}
254-
255-
private void Write(ReadOnlyMemory<byte> buffer, bool flush)
256-
{
257-
lock (_streamLock)
258-
{
259-
if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> segment))
250+
_socket.Client.Poll(_writeableStateTimeoutMicroSeconds, SelectMode.SelectWrite);
251+
while (_frameChannel.Reader.TryRead(out OutboundFrame frame))
260252
{
261-
_writer.Write(segment.Array, segment.Offset, segment.Count);
262-
263-
if (flush)
264-
{
265-
_writer.Flush();
266-
}
253+
int bufferSize = frame.GetMinimumBufferSize();
254+
byte[] memoryArray = ArrayPool<byte>.Shared.Rent(bufferSize);
255+
Memory<byte> slice = new Memory<byte>(memoryArray, 0, bufferSize);
256+
frame.WriteTo(slice);
257+
_writer.Write(memoryArray, 0, bufferSize);
258+
ArrayPool<byte>.Shared.Return(memoryArray);
267259
}
260+
261+
_writer.Flush();
268262
}
269263
}
270264

0 commit comments

Comments
 (0)