Skip to content

Commit 0477cdc

Browse files
committed
2 parents e843364 + e65cb60 commit 0477cdc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+461
-193
lines changed

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
## Changes Between 6.2.4 and 6.3.0
2+
3+
GitHub milestone: [`6.3.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/52?closed=1)
4+
5+
## Changes Between 6.2.3 and 6.2.4
6+
7+
GitHub milestone: [`6.2.4`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/55?closed=1)
8+
9+
This release contains some important bug fixes:
10+
11+
* [Fix connection leaks on auto recovery](https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1145)
12+
* [Fix buffer overflow when writing long strings](https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1162)
13+
* [Fix regression resulting in `ObjectDisposedException`](https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1154)
14+
* [Fix regression that could affect consuming after auto recovery](https://github.com/rabbitmq/rabbitmq-dotnet-client/pull/1148)
15+
116
## Changes Between 6.2.1 and 6.2.3
217

318
GitHub milestone: [`6.2.3`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/54?closed=1)

_site

Submodule _site updated 166 files

appveyor.yml

Lines changed: 0 additions & 24 deletions
This file was deleted.

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
using System;
3333
using System.Collections.Generic;
34+
using System.Buffers;
3435
using System.Linq;
3536
using System.Net.Security;
3637
using System.Security.Authentication;
@@ -188,6 +189,16 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF
188189

189190
// just here to hold the value that was set through the setter
190191
private Uri _uri;
192+
private ArrayPool<byte> _memoryPool = ArrayPool<byte>.Shared;
193+
194+
/// <summary>
195+
/// The memory pool used for allocating buffers. Default is <see cref="MemoryPool{T}.Shared"/>.
196+
/// </summary>
197+
public ArrayPool<byte> MemoryPool
198+
{
199+
get { return _memoryPool; }
200+
set { _memoryPool = value ?? ArrayPool<byte>.Shared; }
201+
}
191202

192203
/// <summary>
193204
/// Amount of time protocol handshake operations are allowed to take before
@@ -497,7 +508,8 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
497508
else
498509
{
499510
var protocol = new RabbitMQ.Client.Framing.Protocol();
500-
conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(CreateFrameHandler), clientProvidedName);
511+
conn = protocol.CreateConnection(this, false, endpointResolver.SelectOne(CreateFrameHandler),
512+
_memoryPool, clientProvidedName);
501513
}
502514
}
503515
catch (Exception e)
@@ -510,7 +522,7 @@ public IConnection CreateConnection(IEndpointResolver endpointResolver, string c
510522

511523
internal IFrameHandler CreateFrameHandler(AmqpTcpEndpoint endpoint)
512524
{
513-
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, SocketFactory,
525+
IFrameHandler fh = Protocols.DefaultProtocol.CreateFrameHandler(endpoint, _memoryPool, SocketFactory,
514526
RequestedConnectionTimeout, SocketReadTimeout, SocketWriteTimeout);
515527
return ConfigureFrameHandler(fh);
516528
}

projects/RabbitMQ.Client/client/api/IAutorecoveringConnection.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,8 @@
3030
//---------------------------------------------------------------------------
3131

3232
using System;
33-
using System.Collections.Generic;
34-
using System.IO;
35-
using System.Threading;
3633

3734
using RabbitMQ.Client.Events;
38-
using RabbitMQ.Client.Exceptions;
3935

4036
namespace RabbitMQ.Client
4137
{

projects/RabbitMQ.Client/client/impl/AsyncConsumerDispatcher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,11 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
4747
IBasicProperties basicProperties,
4848
ReadOnlySpan<byte> body)
4949
{
50-
byte[] bodyBytes = ArrayPool<byte>.Shared.Rent(body.Length);
50+
var pool = _model.Session.Connection.MemoryPool;
51+
byte[] bodyBytes = pool.Rent(body.Length);
5152
Memory<byte> bodyCopy = new Memory<byte>(bodyBytes, 0, body.Length);
5253
body.CopyTo(bodyCopy.Span);
53-
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy));
54+
ScheduleUnlessShuttingDown(new BasicDeliver(consumer, consumerTag, deliveryTag, redelivered, exchange, routingKey, basicProperties, bodyCopy, pool));
5455
}
5556

5657
public void HandleBasicCancelOk(IBasicConsumer consumer, string consumerTag)

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -668,15 +668,22 @@ public void Init(IEndpointResolver endpoints)
668668
Init(fh);
669669
}
670670

671+
internal IFrameHandler FrameHandler
672+
{
673+
get
674+
{
675+
return _delegate.FrameHandler;
676+
}
677+
}
678+
671679
private void Init(IFrameHandler fh)
672680
{
673681
if (_disposed)
674682
{
675683
throw new ObjectDisposedException(GetType().FullName);
676684
}
677685

678-
_delegate = new Connection(_factory, false,
679-
fh, ClientProvidedName);
686+
_delegate = new Connection(_factory, false, fh, _factory.MemoryPool, ClientProvidedName);
680687

681688
_recoveryTask = Task.Run(MainRecoveryLoop);
682689

@@ -1009,7 +1016,7 @@ private bool TryRecoverConnectionDelegate()
10091016
try
10101017
{
10111018
IFrameHandler fh = _endpoints.SelectOne(_factory.CreateFrameHandler);
1012-
_delegate = new Connection(_factory, false, fh, ClientProvidedName);
1019+
_delegate = new Connection(_factory, false, fh, _factory.MemoryPool, ClientProvidedName);
10131020
return true;
10141021
}
10151022
catch (Exception e)

projects/RabbitMQ.Client/client/impl/AutorecoveringModel.cs

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -360,14 +360,7 @@ public bool IsClosed
360360
{
361361
get
362362
{
363-
if (_delegate == null)
364-
{
365-
return false;
366-
}
367-
else
368-
{
369-
return _delegate.IsClosed;
370-
}
363+
return !IsOpen;
371364
}
372365
}
373366

projects/RabbitMQ.Client/client/impl/BasicDeliver.cs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ internal sealed class BasicDeliver : Work
1414
private readonly string _routingKey;
1515
private readonly IBasicProperties _basicProperties;
1616
private readonly ReadOnlyMemory<byte> _body;
17+
private readonly ArrayPool<byte> _bodyOwner;
1718

1819
public override string Context => "HandleBasicDeliver";
1920

@@ -24,7 +25,8 @@ public BasicDeliver(IBasicConsumer consumer,
2425
string exchange,
2526
string routingKey,
2627
IBasicProperties basicProperties,
27-
ReadOnlyMemory<byte> body) : base(consumer)
28+
ReadOnlyMemory<byte> body,
29+
ArrayPool<byte> pool) : base(consumer)
2830
{
2931
_consumerTag = consumerTag;
3032
_deliveryTag = deliveryTag;
@@ -33,6 +35,7 @@ public BasicDeliver(IBasicConsumer consumer,
3335
_routingKey = routingKey;
3436
_basicProperties = basicProperties;
3537
_body = body;
38+
_bodyOwner = pool;
3639
}
3740

3841
protected override Task Execute(IAsyncBasicConsumer consumer)
@@ -50,7 +53,7 @@ public override void PostExecute()
5053
{
5154
if (MemoryMarshal.TryGetArray(_body, out ArraySegment<byte> segment))
5255
{
53-
ArrayPool<byte>.Shared.Return(segment.Array);
56+
_bodyOwner.Return(segment.Array);
5457
}
5558
}
5659
}

projects/RabbitMQ.Client/client/impl/CommandAssembler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public IncomingCommand HandleFrame(in InboundFrame frame)
8888
return IncomingCommand.Empty;
8989
}
9090

91-
var result = new IncomingCommand(_method, _header, _body, _bodyBytes);
91+
var result = new IncomingCommand(_method, _header, _body, _bodyBytes, _protocol.MemoryPool);
9292
Reset();
9393
return result;
9494
}
@@ -123,7 +123,7 @@ private void ParseHeaderFrame(in InboundFrame frame)
123123
_remainingBodyBytes = (int) totalBodyBytes;
124124

125125
// Is returned by IncomingCommand.Dispose in Session.HandleFrame
126-
_bodyBytes = ArrayPool<byte>.Shared.Rent(_remainingBodyBytes);
126+
_bodyBytes = _protocol.MemoryPool.Rent(_remainingBodyBytes);
127127
_body = new Memory<byte>(_bodyBytes, 0, _remainingBodyBytes);
128128
UpdateContentBodyState();
129129
}

projects/RabbitMQ.Client/client/impl/ConcurrentConsumerDispatcher.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
6464
IBasicProperties basicProperties,
6565
ReadOnlySpan<byte> body)
6666
{
67-
byte[] memoryCopyArray = ArrayPool<byte>.Shared.Rent(body.Length);
67+
var pool = _model.Session.Connection.MemoryPool;
68+
byte[] memoryCopyArray = pool.Rent(body.Length);
6869
Memory<byte> memoryCopy = new Memory<byte>(memoryCopyArray, 0, body.Length);
6970
body.CopyTo(memoryCopy.Span);
7071
UnlessShuttingDown(() =>
@@ -90,7 +91,7 @@ public void HandleBasicDeliver(IBasicConsumer consumer,
9091
}
9192
finally
9293
{
93-
ArrayPool<byte>.Shared.Return(memoryCopyArray);
94+
pool.Return(memoryCopyArray);
9495
}
9596
});
9697
}

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ internal sealed class Connection : IConnection
7070
private volatile bool _running = true;
7171
private readonly MainSession _session0;
7272
private SessionManager _sessionManager;
73+
private readonly ArrayPool<byte> _memoryPool = ArrayPool<byte>.Shared;
7374

7475
//
7576
// Heartbeats
@@ -127,6 +128,22 @@ public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHa
127128
}
128129
}
129130

131+
public Connection(IConnectionFactory factory, bool insist, IFrameHandler frameHandler, ArrayPool<byte> memoryPool,
132+
string clientProvidedName = null)
133+
: this(factory, insist, frameHandler, clientProvidedName)
134+
{
135+
if (memoryPool == null)
136+
{
137+
throw new ArgumentNullException(nameof(memoryPool));
138+
}
139+
_memoryPool = memoryPool;
140+
}
141+
142+
internal ArrayPool<byte> MemoryPool
143+
{
144+
get => _memoryPool;
145+
}
146+
130147
public Guid Id { get { return _id; } }
131148

132149
public event EventHandler<CallbackExceptionEventArgs> CallbackException;
@@ -271,7 +288,7 @@ public void Abort(ushort reasonCode, string reasonText, ShutdownInitiator initia
271288

272289
public void Close(ShutdownEventArgs reason)
273290
{
274-
Close(reason, false, Timeout.InfiniteTimeSpan);
291+
Close(reason, false, TimeSpan.FromSeconds(30));
275292
}
276293

277294
///<summary>Try to close connection in a graceful way</summary>
@@ -286,7 +303,7 @@ public void Close(ShutdownEventArgs reason)
286303
///</para>
287304
///<para>
288305
///Timeout determines how much time internal close operations should be given
289-
///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity.
306+
///to complete.
290307
///</para>
291308
///</remarks>
292309
public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
@@ -307,7 +324,10 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
307324
{
308325
// Try to send connection.close
309326
// Wait for CloseOk in the MainLoop
310-
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
327+
if (!_closed)
328+
{
329+
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
330+
}
311331
}
312332
catch (AlreadyClosedException)
313333
{
@@ -453,9 +473,12 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
453473
_session0.SetSessionClosing(false);
454474
try
455475
{
456-
_session0.Transmit(ConnectionCloseWrapper(
457-
hpe.ShutdownReason.ReplyCode,
458-
hpe.ShutdownReason.ReplyText));
476+
if (!_closed)
477+
{
478+
_session0.Transmit(ConnectionCloseWrapper(
479+
hpe.ShutdownReason.ReplyCode,
480+
hpe.ShutdownReason.ReplyText));
481+
}
459482
return true;
460483
}
461484
catch (IOException ioe)
@@ -902,7 +925,7 @@ public void HeartbeatWriteTimerCallback(object state)
902925
{
903926
if (!_closed)
904927
{
905-
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame());
928+
Write(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame(MemoryPool));
906929
_heartbeatWriteTimer?.Change((int)_heartbeatTimeSpan.TotalMilliseconds, Timeout.Infinite);
907930
}
908931
}
@@ -939,7 +962,7 @@ public override string ToString()
939962
return string.Format("Connection({0},{1})", _id, Endpoint);
940963
}
941964

942-
public void Write(Memory<byte> memory)
965+
public void Write(ReadOnlyMemory<byte> memory)
943966
{
944967
_frameHandler.Write(memory);
945968
}
@@ -952,13 +975,13 @@ public void UpdateSecret(string newSecret, string reason)
952975
///<summary>API-side invocation of connection abort.</summary>
953976
public void Abort()
954977
{
955-
Abort(Timeout.InfiniteTimeSpan);
978+
Abort(TimeSpan.FromSeconds(5));
956979
}
957980

958981
///<summary>API-side invocation of connection abort.</summary>
959982
public void Abort(ushort reasonCode, string reasonText)
960983
{
961-
Abort(reasonCode, reasonText, Timeout.InfiniteTimeSpan);
984+
Abort(reasonCode, reasonText, TimeSpan.FromSeconds(5));
962985
}
963986

964987
///<summary>API-side invocation of connection abort with timeout.</summary>
@@ -976,13 +999,13 @@ public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
976999
///<summary>API-side invocation of connection.close.</summary>
9771000
public void Close()
9781001
{
979-
Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan);
1002+
Close(Constants.ReplySuccess, "Goodbye", TimeSpan.FromSeconds(30));
9801003
}
9811004

9821005
///<summary>API-side invocation of connection.close.</summary>
9831006
public void Close(ushort reasonCode, string reasonText)
9841007
{
985-
Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan);
1008+
Close(reasonCode, reasonText, TimeSpan.FromSeconds(30));
9861009
}
9871010

9881011
///<summary>API-side invocation of connection.close with timeout.</summary>
@@ -1058,7 +1081,16 @@ internal OutgoingCommand ChannelCloseWrapper(ushort reasonCode, string reasonTex
10581081
return request;
10591082
}
10601083

1061-
void StartAndTune()
1084+
///<summary>Used for testing only.</summary>
1085+
internal IFrameHandler FrameHandler
1086+
{
1087+
get
1088+
{
1089+
return _frameHandler;
1090+
}
1091+
}
1092+
1093+
private void StartAndTune()
10621094
{
10631095
var connectionStartCell = new BlockingCell<ConnectionStartDetails>();
10641096
_model0.m_connectionStartCell = connectionStartCell;

0 commit comments

Comments
 (0)