Skip to content

Commit a919f73

Browse files
author
Stefán J. Sigurðarson
committed
Improving Pipe/Socket cleanup. Improving IO Read loop.
1 parent 8c8a399 commit a919f73

File tree

2 files changed

+62
-42
lines changed

2 files changed

+62
-42
lines changed

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 59 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
using System.IO.Pipelines;
3636
using System.Net;
3737
using System.Net.Sockets;
38-
using System.Text;
3938
using System.Threading;
4039
using System.Threading.Channels;
4140
using System.Threading.Tasks;
@@ -226,37 +225,32 @@ public void Close()
226225
{
227226
if (!_closed)
228227
{
229-
try
230-
{
231-
_channelWriter.Complete();
232-
_writerTask.GetAwaiter().GetResult();
233-
_pipeReader.Complete();
234-
_pipeWriter.Complete();
235-
}
236-
catch (Exception)
237-
{
238-
}
228+
_channelWriter.TryComplete();
229+
_frameWriter.TryComplete();
230+
try { _writerTask.GetAwaiter().GetResult(); } catch { }
239231

240-
try
241-
{
242-
_socket.Close();
243-
}
244-
catch (Exception)
245-
{
246-
// ignore, we are closing anyway
247-
}
248-
finally
232+
if (_pipe != null)
249233
{
250-
_closed = true;
234+
try { _pipe.Input?.CancelPendingRead(); } catch { }
235+
try { _pipe.Input?.Complete(); } catch { }
236+
try { _pipe.Output?.CancelPendingFlush(); } catch { }
237+
try { _pipe.Output?.Complete(); } catch { }
238+
try { using (_pipe as IDisposable) { } } catch { }
251239
}
240+
241+
try { _socket.Close(); } catch { }
242+
_closed = true;
252243
}
253244
}
254245
}
255246

256247
public void SendHeader()
257248
{
258-
byte[] headerBytes = ArrayPool<byte>.Shared.Rent(8);
259-
Encoding.ASCII.GetBytes("AMQP", 0, 4, headerBytes, 0);
249+
Span<byte> headerBytes = stackalloc byte[8];
250+
headerBytes[0] = (byte)'A';
251+
headerBytes[1] = (byte)'M';
252+
headerBytes[2] = (byte)'Q';
253+
headerBytes[3] = (byte)'P';
260254
if (Endpoint.Protocol.Revision != 0)
261255
{
262256
headerBytes[4] = 0;
@@ -272,8 +266,10 @@ public void SendHeader()
272266
headerBytes[7] = (byte)Endpoint.Protocol.MinorVersion;
273267
}
274268

275-
_pipeWriter.WriteAsync(headerBytes.AsMemory(0, 8)).AsTask().GetAwaiter().GetResult();
276-
ArrayPool<byte>.Shared.Return(headerBytes);
269+
Memory<byte> pipeMemory = _pipeWriter.GetMemory(8);
270+
headerBytes.CopyTo(pipeMemory.Span);
271+
_pipeWriter.Advance(headerBytes.Length);
272+
_pipeWriter.FlushAsync().AsTask().GetAwaiter().GetResult();
277273
}
278274

279275
public void Write(OutgoingFrame frame)
@@ -283,28 +279,51 @@ public void Write(OutgoingFrame frame)
283279

284280
private async Task ReadLoop()
285281
{
286-
while (!FrameReader.Completion.IsCompleted)
282+
bool allowSync = true;
283+
try
287284
{
288-
// Let's read some bytes
289-
if (!_pipeReader.TryRead(out ReadResult readResult))
285+
while (true)
290286
{
291-
readResult = await _pipeReader.ReadAsync().ConfigureAwait(false);
292-
}
287+
// Let's read some bytes
288+
if (!(allowSync && _pipeReader.TryRead(out ReadResult readResult)))
289+
{
290+
readResult = await _pipeReader.ReadAsync().ConfigureAwait(false);
291+
}
293292

294-
ReadOnlySequence<byte> buffer = readResult.Buffer;
295-
if (buffer.IsEmpty)
296-
{
297-
_frameWriter.Complete(new EndOfStreamException());
298-
return;
299-
}
293+
int handled = 0;
294+
ReadOnlySequence<byte> buffer = readResult.Buffer;
295+
if (!buffer.IsEmpty)
296+
{
297+
handled = ProcessBuffer(ref buffer);
298+
}
300299

301-
while (InboundFrame.TryReadFrame(ref buffer, out InboundFrame frame))
302-
{
303-
_frameWriter.TryWrite(frame);
300+
allowSync = handled != 0;
301+
_pipeReader.AdvanceTo(buffer.Start, buffer.End);
302+
303+
if (handled == 0 && readResult.IsCompleted)
304+
{
305+
break;
306+
}
304307
}
308+
}
309+
catch (Exception e)
310+
{
311+
_frameWriter.TryComplete(e);
312+
}
313+
314+
_frameWriter.TryComplete(new EndOfStreamException());
315+
}
305316

306-
_pipeReader.AdvanceTo(buffer.Start, buffer.End);
317+
private int ProcessBuffer(ref ReadOnlySequence<byte> buffer)
318+
{
319+
int handled = 0;
320+
while (InboundFrame.TryReadFrame(ref buffer, out InboundFrame frame))
321+
{
322+
_frameWriter.TryWrite(frame);
323+
handled++;
307324
}
325+
326+
return handled;
308327
}
309328

310329
private async Task WriteLoop()

projects/RabbitMQ.Client/client/impl/TcpClientAdapter.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ public virtual Task ConnectAsync(IPAddress ep, int port)
4242

4343
public virtual void Close()
4444
{
45-
_sock?.Dispose();
46-
_sock = null;
45+
try { _sock.Shutdown(SocketShutdown.Both); } catch { }
46+
try { _sock.Close(); } catch { }
47+
try { _sock.Dispose(); } catch { }
4748
}
4849

4950
[Obsolete("Override Dispose(bool) instead.")]

0 commit comments

Comments
 (0)