Skip to content

Commit 93cdd66

Browse files
Introduce continuation timeouts
1 parent 1d6eac5 commit 93cdd66

File tree

8 files changed

+104
-11
lines changed

8 files changed

+104
-11
lines changed

projects/client/RabbitMQ.Client/src/client/api/ConnectionFactory.cs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,29 @@ public class ConnectionFactory : ConnectionFactoryBase, IConnectionFactory
172172
/// </summary>
173173
public TimeSpan NetworkRecoveryInterval = TimeSpan.FromSeconds(5);
174174

175+
private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
176+
private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20);
177+
178+
/// <summary>
179+
/// Amount of time protocol handshake operations are allowed to take before
180+
/// timing out.
181+
/// </summary>
182+
public TimeSpan HandshakeContinuationTimeout
183+
{
184+
get { return m_handshakeContinuationTimeout; }
185+
set { m_handshakeContinuationTimeout = value; }
186+
}
187+
188+
/// <summary>
189+
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
190+
/// timing out.
191+
/// </summary>
192+
public TimeSpan ContinuationTimeout
193+
{
194+
get { return m_continuationTimeout; }
195+
set { m_continuationTimeout = value; }
196+
}
197+
175198
/// <summary>
176199
/// The port to connect on. <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
177200
/// indicates the default for the protocol should be used.

projects/client/RabbitMQ.Client/src/client/api/IConnectionFactory.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,5 +110,17 @@ public interface IConnectionFactory
110110
/// What task scheduler should consumer dispatcher use.
111111
/// </summary>
112112
TaskScheduler TaskScheduler { get; set; }
113+
114+
/// <summary>
115+
/// Amount of time protocol handshake operations are allowed to take before
116+
/// timing out.
117+
/// </summary>
118+
TimeSpan HandshakeContinuationTimeout { get; set; }
119+
120+
/// <summary>
121+
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
122+
/// timing out.
123+
/// </summary>
124+
TimeSpan ContinuationTimeout { get; set; }
113125
}
114126
}

projects/client/RabbitMQ.Client/src/client/api/IModel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -611,5 +611,11 @@ void QueueDeclareNoWait(string queue, bool durable, bool exclusive,
611611
/// </remarks>
612612
[AmqpMethodDoNotImplement(null)]
613613
void WaitForConfirmsOrDie(TimeSpan timeout);
614+
615+
/// <summary>
616+
/// Amount of time protocol operations (e.g. <code>queue.declare</code>) are allowed to take before
617+
/// timing out.
618+
/// </summary>
619+
TimeSpan ContinuationTimeout { get; set; }
614620
}
615621
}

projects/client/RabbitMQ.Client/src/client/impl/AutorecoveringModel.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ public IConsumerDispatcher ConsumerDispatcher
7777
get { return m_delegate.ConsumerDispatcher; }
7878
}
7979

80+
public TimeSpan ContinuationTimeout
81+
{
82+
get { return m_delegate.ContinuationTimeout; }
83+
set { m_delegate.ContinuationTimeout = value; }
84+
}
85+
8086
public AutorecoveringModel(AutorecoveringConnection conn, RecoveryAwareModel _delegate)
8187
{
8288
m_connection = conn;

projects/client/RabbitMQ.Client/src/client/impl/Connection.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,7 @@ public IModel CreateModel()
12041204
EnsureIsOpen();
12051205
ISession session = CreateSession();
12061206
var model = (IFullModel)Protocol.CreateModel(session, this.ConsumerWorkService);
1207+
model.ContinuationTimeout = m_factory.ContinuationTimeout;
12071208
model._Private_ChannelOpen("");
12081209
return model;
12091210
}

projects/client/RabbitMQ.Client/src/client/impl/ModelBase.cs

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ public abstract class ModelBase : IFullModel, IRecoverable
6363
///sequence. See <see cref="Connection.Open"/> </summary>
6464
public BlockingCell m_connectionStartCell = null;
6565

66+
private TimeSpan m_handshakeContinuationTimeout = TimeSpan.FromSeconds(10);
67+
private TimeSpan m_continuationTimeout = TimeSpan.FromSeconds(20);
68+
6669
public RpcContinuationQueue m_continuationQueue = new RpcContinuationQueue();
6770
public ManualResetEvent m_flowControlBlock = new ManualResetEvent(true);
6871

@@ -106,6 +109,18 @@ protected void Initialise(ISession session)
106109
Session.SessionShutdown += OnSessionShutdown;
107110
}
108111

112+
public TimeSpan HandshakeContinuationTimeout
113+
{
114+
get { return m_handshakeContinuationTimeout; }
115+
set { m_handshakeContinuationTimeout = value; }
116+
}
117+
118+
public TimeSpan ContinuationTimeout
119+
{
120+
get { return m_continuationTimeout; }
121+
set { m_continuationTimeout = value; }
122+
}
123+
109124
public event EventHandler<BasicAckEventArgs> BasicAcks
110125
{
111126
add
@@ -300,8 +315,8 @@ public void Close(ShutdownEventArgs reason, bool abort)
300315
if (SetCloseReason(reason))
301316
{
302317
_Private_ChannelClose(reason.ReplyCode, reason.ReplyText, 0, 0);
303-
}
304-
k.Wait();
318+
}
319+
k.Wait(TimeSpan.FromMilliseconds(10000));
305320
ConsumerDispatcher.Shutdown(this);
306321
}
307322
catch (AlreadyClosedException)
@@ -343,7 +358,7 @@ public string ConnectionOpen(string virtualHost,
343358
// which is a much more suitable exception before connection
344359
// negotiation finishes
345360
}
346-
k.GetReply();
361+
k.GetReply(HandshakeContinuationTimeout);
347362
return k.m_knownHosts;
348363
}
349364

@@ -361,7 +376,7 @@ public ConnectionSecureOrTune ConnectionSecureOk(byte[] response)
361376
// which is a much more suitable exception before connection
362377
// negotiation finishes
363378
}
364-
k.GetReply();
379+
k.GetReply(HandshakeContinuationTimeout);
365380
return k.m_result;
366381
}
367382

@@ -383,7 +398,7 @@ public ConnectionSecureOrTune ConnectionStartOk(IDictionary<string, object> clie
383398
// which is a much more suitable exception before connection
384399
// negotiation finishes
385400
}
386-
k.GetReply();
401+
k.GetReply(HandshakeContinuationTimeout);
387402
return k.m_result;
388403
}
389404

@@ -434,7 +449,7 @@ public MethodBase ModelRpc(MethodBase method, ContentHeaderBase header, byte[] b
434449
{
435450
var k = new SimpleBlockingRpcContinuation();
436451
TransmitAndEnqueue(new Command(method, header, body), k);
437-
return k.GetReply().Method;
452+
return k.GetReply(this.ContinuationTimeout).Method;
438453
}
439454

440455
public void ModelSend(MethodBase method, ContentHeaderBase header, byte[] body)
@@ -1127,7 +1142,7 @@ public void BasicCancel(string consumerTag)
11271142
Enqueue(k);
11281143

11291144
_Private_BasicCancel(consumerTag, false);
1130-
k.GetReply();
1145+
k.GetReply(this.ContinuationTimeout);
11311146
lock (m_consumers)
11321147
{
11331148
m_consumers.Remove(consumerTag);
@@ -1175,7 +1190,7 @@ public string BasicConsume(string queue,
11751190
// the RPC response, but a response is still expected.
11761191
_Private_BasicConsume(queue, consumerTag, noLocal, noAck, exclusive,
11771192
/*nowait:*/ false, arguments);
1178-
k.GetReply();
1193+
k.GetReply(this.ContinuationTimeout);
11791194
string actualConsumerTag = k.m_consumerTag;
11801195

11811196
return actualConsumerTag;
@@ -1187,7 +1202,7 @@ public BasicGetResult BasicGet(string queue,
11871202
var k = new BasicGetRpcContinuation();
11881203
Enqueue(k);
11891204
_Private_BasicGet(queue, noAck);
1190-
k.GetReply();
1205+
k.GetReply(this.ContinuationTimeout);
11911206
return k.m_result;
11921207
}
11931208

@@ -1255,7 +1270,7 @@ public void BasicRecover(bool requeue)
12551270

12561271
Enqueue(k);
12571272
_Private_BasicRecover(requeue);
1258-
k.GetReply();
1273+
k.GetReply(this.ContinuationTimeout);
12591274
}
12601275

12611276
public abstract void BasicRecoverAsync(bool requeue);
@@ -1586,7 +1601,7 @@ private QueueDeclareOk QueueDeclare(string queue, bool passive, bool durable, bo
15861601
var k = new QueueDeclareRpcContinuation();
15871602
Enqueue(k);
15881603
_Private_QueueDeclare(queue, passive, durable, exclusive, autoDelete, false, arguments);
1589-
k.GetReply();
1604+
k.GetReply(this.ContinuationTimeout);
15901605
return k.m_result;
15911606
}
15921607

projects/client/RabbitMQ.Client/src/client/impl/ShutdownContinuation.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,5 +76,10 @@ public virtual ShutdownEventArgs Wait()
7676
{
7777
return (ShutdownEventArgs)m_cell.Value;
7878
}
79+
80+
public ShutdownEventArgs Wait(TimeSpan timeout)
81+
{
82+
return (ShutdownEventArgs)m_cell.GetValue(timeout);
83+
}
7984
}
8085
}

projects/client/RabbitMQ.Client/src/client/impl/SimpleBlockingRpcContinuation.cs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,31 @@ public virtual Command GetReply()
6969
}
7070
}
7171

72+
public virtual Command GetReply(TimeSpan timeout)
73+
{
74+
var result = (Either)m_cell.GetValue(timeout);
75+
switch (result.Alternative)
76+
{
77+
case EitherAlternative.Left:
78+
return (Command)result.Value;
79+
case EitherAlternative.Right:
80+
throw new OperationInterruptedException((ShutdownEventArgs)result.Value);
81+
default:
82+
ReportInvalidInvariant(result);
83+
return null;
84+
}
85+
}
86+
87+
private static void ReportInvalidInvariant(Either result)
88+
{
89+
string error = "Illegal EitherAlternative " + result.Alternative;
90+
#if !(NETFX_CORE)
91+
Trace.Fail(error);
92+
#else
93+
MetroEventSource.Log.Error(error);
94+
#endif
95+
}
96+
7297
public virtual void HandleCommand(Command cmd)
7398
{
7499
m_cell.Value = Either.Left(cmd);

0 commit comments

Comments
 (0)