Skip to content

Commit a445d1d

Browse files
committed
Port rabbitmq/rabbitmq-dotnet-client #950 to main
#950 Fixes #938
1 parent 8588c9e commit a445d1d

File tree

7 files changed

+111
-17
lines changed

7 files changed

+111
-17
lines changed

projects/RabbitMQ.Client/client/api/IConnectionExtensions.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public static class IConnectionExtensions
1919
/// </remarks>
2020
public static void Close(this IConnection connection)
2121
{
22-
connection.Close(Constants.ReplySuccess, "Goodbye", Timeout.InfiniteTimeSpan, false);
22+
connection.Close(Constants.ReplySuccess, "Goodbye", TimeSpan.FromSeconds(30), false);
2323
}
2424

2525
/// <summary>
@@ -37,7 +37,7 @@ public static void Close(this IConnection connection)
3737
/// </remarks>
3838
public static void Close(this IConnection connection, ushort reasonCode, string reasonText)
3939
{
40-
connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, false);
40+
connection.Close(reasonCode, reasonText, TimeSpan.FromSeconds(30), false);
4141
}
4242

4343
/// <summary>
@@ -93,7 +93,7 @@ public static void Close(this IConnection connection, ushort reasonCode, string
9393
/// </remarks>
9494
public static void Abort(this IConnection connection)
9595
{
96-
connection.Close(Constants.ReplySuccess, "Connection close forced", Timeout.InfiniteTimeSpan, true);
96+
connection.Close(Constants.ReplySuccess, "Connection close forced", TimeSpan.FromSeconds(5), true);
9797
}
9898

9999
/// <summary>
@@ -111,7 +111,7 @@ public static void Abort(this IConnection connection)
111111
/// </remarks>
112112
public static void Abort(this IConnection connection, ushort reasonCode, string reasonText)
113113
{
114-
connection.Close(reasonCode, reasonText, Timeout.InfiniteTimeSpan, true);
114+
connection.Close(reasonCode, reasonText, TimeSpan.FromSeconds(5), true);
115115
}
116116

117117
/// <summary>

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ public RecoveryAwareModel CreateNonRecoveringModel()
163163
public override string ToString()
164164
=> $"AutorecoveringConnection({InnerConnection.Id},{Endpoint},{GetHashCode()})";
165165

166+
internal IFrameHandler FrameHandler => InnerConnection.FrameHandler;
167+
166168
internal void Init()
167169
{
168170
Init(_factory.EndpointResolverFactory(new List<AmqpTcpEndpoint> { _factory.Endpoint }));

projects/RabbitMQ.Client/client/impl/Connection.cs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,6 @@ public Connection(IConnectionFactory factory, IFrameHandler frameHandler, string
108108
public bool IsOpen => CloseReason is null;
109109

110110
public int LocalPort => _frameHandler.LocalPort;
111-
112-
///<summary>Another overload of a Protocol property, useful
113-
///for exposing a tighter type.</summary>
114-
internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol;
115-
116111
public int RemotePort => _frameHandler.RemotePort;
117112

118113
public IDictionary<string, object?>? ServerProperties { get; private set; }
@@ -123,6 +118,16 @@ public Connection(IConnectionFactory factory, IFrameHandler frameHandler, string
123118
///<summary>Explicit implementation of IConnection.Protocol.</summary>
124119
IProtocol IConnection.Protocol => Endpoint.Protocol;
125120

121+
///<summary>Another overload of a Protocol property, useful
122+
///for exposing a tighter type.</summary>
123+
internal ProtocolBase Protocol => (ProtocolBase)Endpoint.Protocol;
124+
125+
///<summary>Used for testing only.</summary>
126+
internal IFrameHandler FrameHandler
127+
{
128+
get { return _frameHandler; }
129+
}
130+
126131
public event EventHandler<CallbackExceptionEventArgs> CallbackException
127132
{
128133
add => _callbackExceptionWrapper.AddHandler(value);
@@ -259,7 +264,7 @@ public void Close(ushort reasonCode, string reasonText, TimeSpan timeout, bool a
259264
///</para>
260265
///<para>
261266
///Timeout determines how much time internal close operations should be given
262-
///to complete. System.Threading.Timeout.InfiniteTimeSpan value means infinity.
267+
///to complete.
263268
///</para>
264269
///</remarks>
265270
internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
@@ -279,8 +284,11 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
279284
try
280285
{
281286
// Try to send connection.close wait for CloseOk in the MainLoop
282-
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
283-
_session0.Transmit(ref cmd);
287+
if (!_closed)
288+
{
289+
var cmd = new ConnectionClose(reason.ReplyCode, reason.ReplyText, 0, 0);
290+
_session0.Transmit(ref cmd);
291+
}
284292
}
285293
catch (AlreadyClosedException)
286294
{
@@ -320,12 +328,18 @@ internal void Close(ShutdownEventArgs reason, bool abort, TimeSpan timeout)
320328
{
321329
if (!_mainLoopTask.Wait(timeout))
322330
{
323-
_frameHandler.Close();
331+
if (_frameHandler.IsOpen)
332+
{
333+
_frameHandler.Close();
334+
}
324335
}
325336
}
326337
catch (AggregateException)
327338
{
328-
_frameHandler.Close();
339+
if (_frameHandler.IsOpen)
340+
{
341+
_frameHandler.Close();
342+
}
329343
}
330344
}
331345

@@ -351,8 +365,10 @@ private void FinishClose()
351365
{
352366
_closed = true;
353367
MaybeStopHeartbeatTimers();
354-
355-
_frameHandler.Close();
368+
if (_frameHandler.IsOpen)
369+
{
370+
_frameHandler.Close();
371+
}
356372
_model0.SetCloseReason(CloseReason);
357373
_model0.FinishClose();
358374
RabbitMqClientEventSource.Log.ConnectionClosed();

projects/RabbitMQ.Client/client/impl/IFrameHandler.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,5 +62,7 @@ internal interface IFrameHandler
6262
void SendHeader();
6363

6464
void Write(ReadOnlyMemory<byte> memory);
65+
66+
bool IsOpen { get; }
6567
}
6668
}

projects/RabbitMQ.Client/client/impl/ModelBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -742,7 +742,7 @@ protected void HandleConnectionStart(in IncomingCommand cmd)
742742
if (m_connectionStartCell is null)
743743
{
744744
var reason = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.CommandInvalid, "Unexpected Connection.Start");
745-
Session.Connection.Close(reason, false, Timeout.InfiniteTimeSpan);
745+
Session.Connection.Close(reason, false, TimeSpan.FromSeconds(30));
746746
}
747747

748748
var method = new ConnectionStart(cmd.MethodBytes.Span);

projects/RabbitMQ.Client/client/impl/SocketFrameHandler.cs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,24 @@ public TimeSpan WriteTimeout
194194
}
195195
}
196196

197+
public bool IsOpen
198+
{
199+
get
200+
{
201+
if (_closed)
202+
{
203+
return false;
204+
}
205+
206+
if (_socket == null)
207+
{
208+
return false;
209+
}
210+
211+
return _socket.Connected;
212+
}
213+
}
214+
197215
public void Close()
198216
{
199217
lock (_semaphore)

projects/Unit/TestConnectionShutdown.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
using System.Threading;
3434

3535
using RabbitMQ.Client.Impl;
36+
using RabbitMQ.Client.Framing.Impl;
3637

3738
using Xunit;
3839

@@ -41,6 +42,61 @@ namespace RabbitMQ.Client.Unit
4142

4243
public class TestConnectionShutdown : IntegrationFixture
4344
{
45+
[Fact]
46+
public void TestCleanClosureWithSocketClosedOutOfBand()
47+
{
48+
_conn = CreateAutorecoveringConnection();
49+
_model = _conn.CreateModel();
50+
51+
var latch = new ManualResetEventSlim(false);
52+
_model.ModelShutdown += (model, args) => {
53+
latch.Set();
54+
};
55+
56+
var c = (AutorecoveringConnection)_conn;
57+
c.FrameHandler.Close();
58+
59+
_conn.Close(TimeSpan.FromSeconds(4));
60+
Wait(latch, TimeSpan.FromSeconds(5));
61+
}
62+
63+
[Fact]
64+
public void TestAbortWithSocketClosedOutOfBand()
65+
{
66+
_conn = CreateAutorecoveringConnection();
67+
_model = _conn.CreateModel();
68+
69+
var latch = new ManualResetEventSlim(false);
70+
_model.ModelShutdown += (model, args) => {
71+
latch.Set();
72+
};
73+
74+
var c = (AutorecoveringConnection)_conn;
75+
c.FrameHandler.Close();
76+
77+
_conn.Abort();
78+
// default Connection.Abort() timeout and then some
79+
Wait(latch, TimeSpan.FromSeconds(6));
80+
}
81+
82+
[Fact]
83+
public void TestDisposedWithSocketClosedOutOfBand()
84+
{
85+
_conn = CreateAutorecoveringConnection();
86+
_model = _conn.CreateModel();
87+
88+
var latch = new ManualResetEventSlim(false);
89+
_model.ModelShutdown += (model, args) => {
90+
latch.Set();
91+
};
92+
93+
var c = (AutorecoveringConnection)_conn;
94+
c.FrameHandler.Close();
95+
96+
_conn.Dispose();
97+
Wait(latch, TimeSpan.FromSeconds(3));
98+
}
99+
44100
[Fact]
45101
public void TestShutdownSignalPropagationToChannels()
46102
{

0 commit comments

Comments
 (0)