Skip to content

Commit fd4e242

Browse files
committed
Added tests for topology recovery exception handler
1 parent e723327 commit fd4e242

File tree

4 files changed

+232
-17
lines changed

4 files changed

+232
-17
lines changed

projects/RabbitMQ.Client/client/api/TopologyRecoveryExceptionHandler.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ public class TopologyRecoveryExceptionHandler
1616
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
1717
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
1818
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
19-
private Action<IRecordedExchange, Exception> _exchangeRecoveryExceptionHandler;
20-
private Action<IRecordedQueue, Exception> _queueRecoveryExceptionHandler;
21-
private Action<IRecordedBinding, Exception> _bindingRecoveryExceptionHandler;
22-
private Action<IRecordedConsumer, Exception> _consumerRecoveryExceptionHandler;
19+
private Action<IRecordedExchange, Exception, IConnection> _exchangeRecoveryExceptionHandler;
20+
private Action<IRecordedQueue, Exception, IConnection> _queueRecoveryExceptionHandler;
21+
private Action<IRecordedBinding, Exception, IConnection> _bindingRecoveryExceptionHandler;
22+
private Action<IRecordedConsumer, Exception, IConnection> _consumerRecoveryExceptionHandler;
2323

2424
/// <summary>
2525
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
@@ -92,7 +92,7 @@ public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionConditi
9292
/// <summary>
9393
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
9494
/// </summary>
95-
public Action<IRecordedExchange, Exception> ExchangeRecoveryExceptionHandler
95+
public Action<IRecordedExchange, Exception, IConnection> ExchangeRecoveryExceptionHandler
9696
{
9797
get => _exchangeRecoveryExceptionHandler;
9898

@@ -108,7 +108,7 @@ public Action<IRecordedExchange, Exception> ExchangeRecoveryExceptionHandler
108108
/// <summary>
109109
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
110110
/// </summary>
111-
public Action<IRecordedQueue, Exception> QueueRecoveryExceptionHandler
111+
public Action<IRecordedQueue, Exception, IConnection> QueueRecoveryExceptionHandler
112112
{
113113
get => _queueRecoveryExceptionHandler;
114114

@@ -124,7 +124,7 @@ public Action<IRecordedQueue, Exception> QueueRecoveryExceptionHandler
124124
/// <summary>
125125
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
126126
/// </summary>
127-
public Action<IRecordedBinding, Exception> BindingRecoveryExceptionHandler
127+
public Action<IRecordedBinding, Exception, IConnection> BindingRecoveryExceptionHandler
128128
{
129129
get => _bindingRecoveryExceptionHandler;
130130

@@ -141,7 +141,7 @@ public Action<IRecordedBinding, Exception> BindingRecoveryExceptionHandler
141141
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
142142
/// Is only called when the exception did not cause the consumer's channel to close.
143143
/// </summary>
144-
public Action<IRecordedConsumer, Exception> ConsumerRecoveryExceptionHandler
144+
public Action<IRecordedConsumer, Exception, IConnection> ConsumerRecoveryExceptionHandler
145145
{
146146
get => _consumerRecoveryExceptionHandler;
147147

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private void RecoverExchanges(RecoveryChannelFactory recoveryChannelFactory)
224224
if (_config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler != null
225225
&& _config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionCondition(recordedExchange, ex))
226226
{
227-
_config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(recordedExchange, ex);
227+
_config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(recordedExchange, ex, this);
228228
}
229229
else
230230
{
@@ -270,7 +270,7 @@ private void RecoverQueues(RecoveryChannelFactory recoveryChannelFactory)
270270
if (_config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler != null
271271
&& _config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionCondition(recordedQueue, ex))
272272
{
273-
_config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(recordedQueue, ex);
273+
_config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(recordedQueue, ex, this);
274274
}
275275
else
276276
{
@@ -293,7 +293,7 @@ private void RecoverBindings(RecoveryChannelFactory recoveryChannelFactory)
293293
if (_config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler != null
294294
&& _config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionCondition(binding, ex))
295295
{
296-
_config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(binding, ex);
296+
_config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(binding, ex, this);
297297
}
298298
else
299299
{
@@ -330,7 +330,7 @@ internal void RecoverConsumers(AutorecoveringChannel channelToRecover, IChannel
330330
if (_config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null
331331
&& _config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionCondition(consumer, ex))
332332
{
333-
_config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(consumer, ex);
333+
_config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(consumer, ex, this);
334334
}
335335
else
336336
{

projects/Unit/Fixtures.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,18 @@ internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyReco
175175
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
176176
}
177177

178+
internal AutorecoveringConnection CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(TopologyRecoveryExceptionHandler handler)
179+
{
180+
var cf = new ConnectionFactory
181+
{
182+
AutomaticRecoveryEnabled = true,
183+
TopologyRecoveryEnabled = true,
184+
TopologyRecoveryExceptionHandler = handler
185+
};
186+
187+
return (AutorecoveringConnection)cf.CreateConnection($"{_testDisplayName}:{Guid.NewGuid()}");
188+
}
189+
178190
internal IConnection CreateConnectionWithContinuationTimeout(bool automaticRecoveryEnabled, TimeSpan continuationTimeout)
179191
{
180192
var cf = new ConnectionFactory

projects/Unit/TestConnectionRecovery.cs

Lines changed: 208 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,8 +1182,8 @@ public void TestTopologyRecoveryConsumerFilter()
11821182
var exchange = "topology.recovery.exchange";
11831183
var queueWithRecoveredConsumer = "topology.recovery.queue.1";
11841184
var queueWithIgnoredConsumer = "topology.recovery.queue.2";
1185-
var binding1 = "recovered.binding";
1186-
var binding2 = "filtered.binding";
1185+
var binding1 = "recovered.binding.1";
1186+
var binding2 = "recovered.binding.2";
11871187

11881188
ch.ExchangeDeclare(exchange, "direct");
11891189
ch.QueueDeclare(queueWithRecoveredConsumer, false, false, false, null);
@@ -1278,6 +1278,209 @@ public void TestTopologyRecoveryDefaultFilterRecoversAllEntities()
12781278
Assert.True(consumerLatch2.Wait(TimeSpan.FromSeconds(5)));
12791279
}
12801280

1281+
[Fact]
1282+
public void TestTopologyRecoveryQueueExceptionHandler()
1283+
{
1284+
var changedQueueArguments = new Dictionary<string, object>
1285+
{
1286+
{ Headers.XMaxPriority, 20 }
1287+
};
1288+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1289+
{
1290+
QueueRecoveryExceptionCondition = (rq, ex) =>
1291+
{
1292+
return rq.Name.Contains("exception")
1293+
&& ex is OperationInterruptedException operationInterruptedException
1294+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.PreconditionFailed;
1295+
},
1296+
QueueRecoveryExceptionHandler = (rq, ex, connection) =>
1297+
{
1298+
using (var model = connection.CreateModel())
1299+
{
1300+
model.QueueDeclare(rq.Name, false, false, false, changedQueueArguments);
1301+
}
1302+
}
1303+
};
1304+
var latch = new ManualResetEventSlim(false);
1305+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1306+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1307+
IModel ch = conn.CreateModel();
1308+
1309+
var queueToRecoverWithException = "recovery.exception.queue";
1310+
var queueToRecoverSuccessfully = "successfully.recovered.queue";
1311+
ch.QueueDeclare(queueToRecoverWithException, false, false, false, null);
1312+
ch.QueueDeclare(queueToRecoverSuccessfully, false, false, false, null);
1313+
1314+
_model.QueueDelete(queueToRecoverSuccessfully);
1315+
_model.QueueDelete(queueToRecoverWithException);
1316+
_model.QueueDeclare(queueToRecoverWithException, false, false, false, changedQueueArguments);
1317+
1318+
CloseAndWaitForRecovery(conn);
1319+
Wait(latch);
1320+
1321+
Assert.True(ch.IsOpen);
1322+
AssertQueueRecovery(ch, queueToRecoverSuccessfully, false);
1323+
AssertQueueRecovery(ch, queueToRecoverWithException, false, changedQueueArguments);
1324+
1325+
//Cleanup
1326+
_model.QueueDelete(queueToRecoverWithException);
1327+
}
1328+
1329+
[Fact]
1330+
public void TestTopologyRecoveryExchangeExceptionHandler()
1331+
{
1332+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1333+
{
1334+
ExchangeRecoveryExceptionCondition = (re, ex) =>
1335+
{
1336+
return re.Name.Contains("exception")
1337+
&& ex is OperationInterruptedException operationInterruptedException
1338+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.PreconditionFailed;
1339+
},
1340+
ExchangeRecoveryExceptionHandler = (re, ex, connection) =>
1341+
{
1342+
using (var model = connection.CreateModel())
1343+
{
1344+
model.ExchangeDeclare(re.Name, "topic", false, false);
1345+
}
1346+
}
1347+
};
1348+
var latch = new ManualResetEventSlim(false);
1349+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1350+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1351+
IModel ch = conn.CreateModel();
1352+
1353+
var exchangeToRecoverWithException = "recovery.exception.exchange";
1354+
var exchangeToRecoverSuccessfully = "successfully.recovered.exchange";
1355+
ch.ExchangeDeclare(exchangeToRecoverWithException, "direct", false, false);
1356+
ch.ExchangeDeclare(exchangeToRecoverSuccessfully, "direct", false, false);
1357+
1358+
_model.ExchangeDelete(exchangeToRecoverSuccessfully);
1359+
_model.ExchangeDelete(exchangeToRecoverWithException);
1360+
_model.ExchangeDeclare(exchangeToRecoverWithException, "topic", false, false);
1361+
1362+
CloseAndWaitForRecovery(conn);
1363+
Wait(latch);
1364+
1365+
Assert.True(ch.IsOpen);
1366+
AssertExchangeRecovery(ch, exchangeToRecoverSuccessfully);
1367+
AssertExchangeRecovery(ch, exchangeToRecoverWithException);
1368+
1369+
//Cleanup
1370+
_model.ExchangeDelete(exchangeToRecoverWithException);
1371+
}
1372+
1373+
[Fact]
1374+
public void TestTopologyRecoveryBindingExceptionHandler()
1375+
{
1376+
var exchange = "topology.recovery.exchange";
1377+
var queueWithExceptionBinding = "recovery.exception.queue";
1378+
var bindingToRecoverWithException = "recovery.exception.binding";
1379+
1380+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1381+
{
1382+
BindingRecoveryExceptionCondition = (b, ex) =>
1383+
{
1384+
return b.RoutingKey.Contains("exception")
1385+
&& ex is OperationInterruptedException operationInterruptedException
1386+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.NotFound;
1387+
},
1388+
BindingRecoveryExceptionHandler = (b, ex, connection) =>
1389+
{
1390+
using (var model = connection.CreateModel())
1391+
{
1392+
model.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
1393+
model.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
1394+
}
1395+
}
1396+
};
1397+
var latch = new ManualResetEventSlim(false);
1398+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1399+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1400+
IModel ch = conn.CreateModel();
1401+
1402+
var queueWithRecoveredBinding = "successfully.recovered.queue";
1403+
var bindingToRecoverSuccessfully = "successfully.recovered.binding";
1404+
1405+
_model.QueueDeclare(queueWithExceptionBinding, false, false, false, null);
1406+
1407+
ch.ExchangeDeclare(exchange, "direct");
1408+
ch.QueueDeclare(queueWithRecoveredBinding, false, false, false, null);
1409+
ch.QueueBind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully);
1410+
ch.QueueBind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
1411+
ch.QueuePurge(queueWithRecoveredBinding);
1412+
ch.QueuePurge(queueWithExceptionBinding);
1413+
1414+
_model.QueueUnbind(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully);
1415+
_model.QueueUnbind(queueWithExceptionBinding, exchange, bindingToRecoverWithException);
1416+
_model.QueueDelete(queueWithExceptionBinding);
1417+
1418+
CloseAndWaitForRecovery(conn);
1419+
Wait(latch);
1420+
1421+
Assert.True(ch.IsOpen);
1422+
Assert.True(SendAndConsumeMessage(queueWithRecoveredBinding, exchange, bindingToRecoverSuccessfully));
1423+
Assert.False(SendAndConsumeMessage(queueWithExceptionBinding, exchange, bindingToRecoverWithException));
1424+
}
1425+
1426+
[Fact]
1427+
public void TestTopologyRecoveryConsumerExceptionHandler()
1428+
{
1429+
var queueWithExceptionConsumer = "recovery.exception.queue";
1430+
1431+
var exceptionHandler = new TopologyRecoveryExceptionHandler
1432+
{
1433+
ConsumerRecoveryExceptionCondition = (c, ex) =>
1434+
{
1435+
return c.ConsumerTag.Contains("exception")
1436+
&& ex is OperationInterruptedException operationInterruptedException
1437+
&& operationInterruptedException.ShutdownReason.ReplyCode == Constants.NotFound;
1438+
},
1439+
ConsumerRecoveryExceptionHandler = (c, ex, connection) =>
1440+
{
1441+
using (var model = connection.CreateModel())
1442+
{
1443+
model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
1444+
model.BasicConsume(queueWithExceptionConsumer, true, c.ConsumerTag, c.Consumer);
1445+
}
1446+
}
1447+
};
1448+
var latch = new ManualResetEventSlim(false);
1449+
AutorecoveringConnection conn = CreateAutorecoveringConnectionWithTopologyRecoveryExceptionHandler(exceptionHandler);
1450+
conn.RecoverySucceeded += (source, ea) => latch.Set();
1451+
IModel ch = conn.CreateModel();
1452+
ch.ConfirmSelect();
1453+
1454+
_model.QueueDeclare(queueWithExceptionConsumer, false, false, false, null);
1455+
_model.QueuePurge(queueWithExceptionConsumer);
1456+
1457+
var recoverLatch = new ManualResetEventSlim(false);
1458+
var consumerToRecover = new EventingBasicConsumer(ch);
1459+
consumerToRecover.Received += (source, ea) => recoverLatch.Set();
1460+
ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover);
1461+
1462+
_model.QueueDelete(queueWithExceptionConsumer);
1463+
1464+
CloseAndWaitForRecovery(conn);
1465+
Wait(latch);
1466+
1467+
Assert.True(ch.IsOpen);
1468+
1469+
ch.BasicPublish("", queueWithExceptionConsumer, Encoding.UTF8.GetBytes("test message"));
1470+
1471+
Assert.True(recoverLatch.Wait(TimeSpan.FromSeconds(5)));
1472+
1473+
try
1474+
{
1475+
ch.BasicConsume(queueWithExceptionConsumer, true, "exception.consumer", consumerToRecover);
1476+
Assert.Fail("Expected an exception");
1477+
}
1478+
catch (OperationInterruptedException e)
1479+
{
1480+
AssertShutdownError(e.ShutdownReason, 530); // NOT_ALLOWED - not allowed to reuse consumer tag
1481+
}
1482+
}
1483+
12811484
internal bool SendAndConsumeMessage(string queue, string exchange, string routingKey)
12821485
{
12831486
using (var ch = _conn.CreateModel())
@@ -1313,15 +1516,15 @@ internal void AssertQueueRecovery(IChannel m, string q)
13131516
AssertQueueRecovery(m, q, true);
13141517
}
13151518

1316-
internal void AssertQueueRecovery(IChannel m, string q, bool exclusive)
1519+
internal void AssertQueueRecovery(IChannel m, string q, bool exclusive, IDictionary<string, object> arguments = null)
13171520
{
13181521
m.ConfirmSelect();
13191522
m.QueueDeclarePassive(q);
1320-
QueueDeclareOk ok1 = m.QueueDeclare(q, false, exclusive, false, null);
1523+
QueueDeclareOk ok1 = m.QueueDeclare(q, false, exclusive, false, arguments);
13211524
Assert.Equal(0u, ok1.MessageCount);
13221525
m.BasicPublish("", q, _messageBody);
13231526
Assert.True(WaitForConfirms(m));
1324-
QueueDeclareOk ok2 = m.QueueDeclare(q, false, exclusive, false, null);
1527+
QueueDeclareOk ok2 = m.QueueDeclare(q, false, exclusive, false, arguments);
13251528
Assert.Equal(1u, ok2.MessageCount);
13261529
}
13271530

0 commit comments

Comments
 (0)