Skip to content

WIP (Spike) Introduce bound default timeouts for connection closure #950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ public AmqpTcpEndpoint[] KnownHosts
set => Delegate.KnownHosts = value;
}

internal IFrameHandler FrameHandler => Delegate.FrameHandler;

public int LocalPort => Delegate.LocalPort;

public ProtocolBase Protocol => Delegate.Protocol;
Expand Down Expand Up @@ -249,8 +251,6 @@ private bool TryPerformAutomaticRecovery()
return false;
}

public void Close(ShutdownEventArgs reason) => Delegate.Close(reason);

public RecoveryAwareModel CreateNonRecoveringModel()
{
ISession session = Delegate.CreateSession();
Expand Down Expand Up @@ -517,6 +517,16 @@ public void Close(ushort reasonCode, string reasonText)
}
}

public void Close(ShutdownEventArgs reason)
{
ThrowIfDisposed();
StopRecoveryLoop();
if (_delegate.IsOpen)
{
_delegate.Close(reason);
}
}

///<summary>API-side invocation of connection.close with timeout.</summary>
public void Close(TimeSpan timeout)
{
Expand Down
44 changes: 33 additions & 11 deletions projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ internal sealed class Connection : IConnection
.GetCustomAttribute<AssemblyInformationalVersionAttribute>()
.InformationalVersion;

public static readonly TimeSpan DefaultConnectionClosureTimeout;
public static readonly TimeSpan DefaultConnectionAbortTimeout;

static Connection()
{
DefaultConnectionClosureTimeout = TimeSpan.FromSeconds(30);
DefaultConnectionAbortTimeout = TimeSpan.FromSeconds(2);
}

// true if we haven't finished connection negotiation.
// In this state socket exceptions are treated as fatal connection
// errors, otherwise as read timeouts
Expand Down Expand Up @@ -211,6 +220,11 @@ public TimeSpan Heartbeat

public AmqpTcpEndpoint[] KnownHosts { get; set; }

internal IFrameHandler FrameHandler
{
get { return _frameHandler; }
}

public EndPoint LocalEndPoint => _frameHandler.LocalEndPoint;

public int LocalPort => _frameHandler.LocalPort;
Expand Down Expand Up @@ -250,7 +264,7 @@ public void Abort(ushort reasonCode, string reasonText, ShutdownInitiator initia

public void Close(ShutdownEventArgs reason)
{
Close(reason, false, Timeout.InfiniteTimeSpan);
Close(reason, false, DefaultConnectionClosureTimeout);
}

///<summary>Try to close connection in a graceful way</summary>
Expand All @@ -265,7 +279,7 @@ public void Close(ShutdownEventArgs reason)
///</para>
///<para>
///Timeout determines how much time internal close operations should be given
///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity.
///to complete.
///</para>
///</remarks>
public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
Expand All @@ -284,9 +298,12 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)

try
{
// Try to send connection.close
// Wait for CloseOk in the MainLoop
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
if(_running && _frameHandler.IsOpen())
{
// Try to send connection.close
// Wait for CloseOk in the MainLoop
_session0.Transmit(ConnectionCloseWrapper(reason.ReplyCode, reason.ReplyText));
}
}
catch (AlreadyClosedException)
{
Expand Down Expand Up @@ -325,7 +342,9 @@ public void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
bool receivedSignal = _appContinuation.Wait(timeout);
if (!receivedSignal)
{
_frameHandler.Close();
if(_frameHandler.IsOpen()) {
_frameHandler.Close();
}
}
}

Expand Down Expand Up @@ -397,7 +416,10 @@ public void FinishClose()
_closed = true;
MaybeStopHeartbeatTimers();

_frameHandler.Close();
if(_frameHandler.IsOpen())
{
_frameHandler.Close();
}
_model0.SetCloseReason(_closeReason);
_model0.FinishClose();
}
Expand Down Expand Up @@ -921,13 +943,13 @@ public void UpdateSecret(string newSecret, string reason)
///<summary>API-side invocation of connection abort.</summary>
public void Abort()
{
Abort(Timeout.InfiniteTimeSpan);
Abort(DefaultConnectionAbortTimeout);
}

///<summary>API-side invocation of connection abort.</summary>
public void Abort(ushort reasonCode, string reasonText)
{
Abort(reasonCode, reasonText, Timeout.InfiniteTimeSpan);
Abort(reasonCode, reasonText, DefaultConnectionAbortTimeout);
}

///<summary>API-side invocation of connection abort with timeout.</summary>
Expand All @@ -945,13 +967,13 @@ public void Abort(ushort reasonCode, string reasonText, TimeSpan timeout)
///<summary>API-side invocation of connection.close.</summary>
public void Close()
{
Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan);
Close(Constants.ReplySuccess, "Goodbye", DefaultConnectionClosureTimeout);
}

///<summary>API-side invocation of connection.close.</summary>
public void Close(ushort reasonCode, string reasonText)
{
Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan);
Close(reasonCode, reasonText, DefaultConnectionClosureTimeout);
}

///<summary>API-side invocation of connection.close with timeout.</summary>
Expand Down
2 changes: 2 additions & 0 deletions projects/RabbitMQ.Client/client/impl/IFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ internal interface IFrameHandler
void SendHeader();

void Write(ReadOnlyMemory<byte> memory);

bool IsOpen();
}
}
15 changes: 15 additions & 0 deletions projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,21 @@ public TimeSpan WriteTimeout
}
}

public bool IsOpen()
{
if(_closed)
{
return false;
}

if(_socket == null)
{
return false;
}

return _socket.Connected;
}

public void Close()
{
lock (_semaphore)
Expand Down
45 changes: 45 additions & 0 deletions projects/Unit/TestConnectionShutdown.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,57 @@
using NUnit.Framework;

using RabbitMQ.Client.Impl;
using RabbitMQ.Client.Framing.Impl;


namespace RabbitMQ.Client.Unit
{
[TestFixture]
public class TestConnectionShutdown : IntegrationFixture
{

[Test]
public void TestCleanClosureWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

var latch = new ManualResetEventSlim(false);
_conn.Close(TimeSpan.FromSeconds(4));
Wait(latch, TimeSpan.FromSeconds(5));
}

[Test]
public void TestAbortWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

var latch = new ManualResetEventSlim(false);
_conn.Abort();
// default Connection.Abort() timeout and then some
Wait(latch, Connection.DefaultConnectionAbortTimeout + TimeSpan.FromSeconds(1));
}

public void TestDisposedWithSocketClosedOutOfBand()
{
_conn = CreateAutorecoveringConnection();
_model = _conn.CreateModel();

var c = (AutorecoveringConnection)_conn;
c.FrameHandler.Close();

var latch = new ManualResetEventSlim(false);
_conn.Dispose();
Wait(latch, TimeSpan.FromSeconds(3));
}

[Test]
public void TestShutdownSignalPropagationToChannels()
{
Expand Down