Skip to content

Commit e723327

Browse files
committed
Added custom exception handlers to topology recovery
1 parent 599f859 commit e723327

File tree

5 files changed

+258
-15
lines changed

5 files changed

+258
-15
lines changed

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public sealed class ConnectionConfig
9292
/// </summary>
9393
public TopologyRecoveryFilter TopologyRecoveryFilter { get; }
9494

95+
/// <summary>
96+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
97+
/// </summary>
98+
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; }
99+
95100
/// <summary>
96101
/// Amount of time client will wait for before re-trying to recover connection.
97102
/// </summary>
@@ -133,7 +138,8 @@ public sealed class ConnectionConfig
133138

134139
internal ConnectionConfig(string virtualHost, string userName, string password, IList<IAuthMechanismFactory> authMechanisms,
135140
IDictionary<string, object?> clientProperties, string? clientProvidedName,
136-
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled, TopologyRecoveryFilter topologyRecoveryFilter,
141+
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled,
142+
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
137143
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
138144
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
139145
Func<AmqpTcpEndpoint, IFrameHandler> frameHandlerFactory)
@@ -148,6 +154,7 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
148154
MaxFrameSize = maxFrameSize;
149155
TopologyRecoveryEnabled = topologyRecoveryEnabled;
150156
TopologyRecoveryFilter = topologyRecoveryFilter;
157+
TopologyRecoveryExceptionHandler = topologyRecoveryExceptionHandler;
151158
NetworkRecoveryInterval = networkRecoveryInterval;
152159
HeartbeatInterval = heartbeatInterval;
153160
ContinuationTimeout = continuationTimeout;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,11 @@ public TimeSpan ContinuationTimeout
265265
/// </summary>
266266
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();
267267

268+
/// <summary>
269+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
270+
/// </summary>
271+
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } = new TopologyRecoveryExceptionHandler();
272+
268273
/// <summary>
269274
/// Construct a fresh instance, with all fields set to their respective defaults.
270275
/// </summary>
@@ -542,6 +547,7 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
542547
RequestedFrameMax,
543548
TopologyRecoveryEnabled,
544549
TopologyRecoveryFilter,
550+
TopologyRecoveryExceptionHandler,
545551
NetworkRecoveryInterval,
546552
RequestedHeartbeat,
547553
ContinuationTimeout,

projects/RabbitMQ.Client/client/api/IRecordedConsumer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ namespace RabbitMQ.Client
55
#nullable enable
66
public interface IRecordedConsumer
77
{
8+
IBasicConsumer Consumer { get; }
9+
810
string ConsumerTag { get; }
911

1012
string Queue { get; }
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
7+
/// </summary>
8+
public class TopologyRecoveryExceptionHandler
9+
{
10+
private static readonly Func<IRecordedExchange, Exception, bool> s_defaultExchangeExceptionCondition = (e, ex) => true;
11+
private static readonly Func<IRecordedQueue, Exception, bool> s_defaultQueueExceptionCondition = (q, ex) => true;
12+
private static readonly Func<IRecordedBinding, Exception, bool> s_defaultBindingExceptionCondition = (b, ex) => true;
13+
private static readonly Func<IRecordedConsumer, Exception, bool> s_defaultConsumerExceptionCondition = (c, ex) => true;
14+
15+
private Func<IRecordedExchange, Exception, bool> _exchangeRecoveryExceptionCondition;
16+
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
17+
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
18+
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
19+
private Action<IRecordedExchange, Exception> _exchangeRecoveryExceptionHandler;
20+
private Action<IRecordedQueue, Exception> _queueRecoveryExceptionHandler;
21+
private Action<IRecordedBinding, Exception> _bindingRecoveryExceptionHandler;
22+
private Action<IRecordedConsumer, Exception> _consumerRecoveryExceptionHandler;
23+
24+
/// <summary>
25+
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
26+
/// Default condition applies the exception handler to all exchange recovery exceptions.
27+
/// </summary>
28+
public Func<IRecordedExchange, Exception, bool> ExchangeRecoveryExceptionCondition
29+
{
30+
get => _exchangeRecoveryExceptionCondition ?? s_defaultExchangeExceptionCondition;
31+
32+
set
33+
{
34+
if (_exchangeRecoveryExceptionCondition != null)
35+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
36+
37+
_exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
38+
}
39+
}
40+
41+
/// <summary>
42+
/// Decides which queue recovery exceptions the custom exception handler is applied to.
43+
/// Default condition applies the exception handler to all queue recovery exceptions.
44+
/// </summary>
45+
public Func<IRecordedQueue, Exception, bool> QueueRecoveryExceptionCondition
46+
{
47+
get => _queueRecoveryExceptionCondition ?? s_defaultQueueExceptionCondition;
48+
49+
set
50+
{
51+
if (_queueRecoveryExceptionCondition != null)
52+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized.");
53+
54+
_queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition));
55+
}
56+
}
57+
58+
/// <summary>
59+
/// Decides which binding recovery exceptions the custom exception handler is applied to.
60+
/// Default condition applies the exception handler to all binding recovery exceptions.
61+
/// </summary>
62+
public Func<IRecordedBinding, Exception, bool> BindingRecoveryExceptionCondition
63+
{
64+
get => _bindingRecoveryExceptionCondition ?? s_defaultBindingExceptionCondition;
65+
66+
set
67+
{
68+
if (_bindingRecoveryExceptionCondition != null)
69+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
70+
71+
_bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
72+
}
73+
}
74+
75+
/// <summary>
76+
/// Decides which consumer recovery exceptions the custom exception handler is applied to.
77+
/// Default condition applies the exception handler to all consumer recovery exceptions.
78+
/// </summary>
79+
public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionCondition
80+
{
81+
get => _consumerRecoveryExceptionCondition ?? s_defaultConsumerExceptionCondition;
82+
83+
set
84+
{
85+
if (_consumerRecoveryExceptionCondition != null)
86+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized.");
87+
88+
_consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition));
89+
}
90+
}
91+
92+
/// <summary>
93+
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
94+
/// </summary>
95+
public Action<IRecordedExchange, Exception> ExchangeRecoveryExceptionHandler
96+
{
97+
get => _exchangeRecoveryExceptionHandler;
98+
99+
set
100+
{
101+
if (_exchangeRecoveryExceptionHandler != null)
102+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionHandler)} after it has been initialized.");
103+
104+
_exchangeRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionHandler));
105+
}
106+
}
107+
108+
/// <summary>
109+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
110+
/// </summary>
111+
public Action<IRecordedQueue, Exception> QueueRecoveryExceptionHandler
112+
{
113+
get => _queueRecoveryExceptionHandler;
114+
115+
set
116+
{
117+
if (_queueRecoveryExceptionHandler != null)
118+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionHandler)} after it has been initialized.");
119+
120+
_queueRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionHandler));
121+
}
122+
}
123+
124+
/// <summary>
125+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
126+
/// </summary>
127+
public Action<IRecordedBinding, Exception> BindingRecoveryExceptionHandler
128+
{
129+
get => _bindingRecoveryExceptionHandler;
130+
131+
set
132+
{
133+
if (_bindingRecoveryExceptionHandler != null)
134+
throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionHandler)} after it has been initialized.");
135+
136+
_bindingRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionHandler));
137+
}
138+
}
139+
140+
/// <summary>
141+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
142+
/// Is only called when the exception did not cause the consumer's channel to close.
143+
/// </summary>
144+
public Action<IRecordedConsumer, Exception> ConsumerRecoveryExceptionHandler
145+
{
146+
get => _consumerRecoveryExceptionHandler;
147+
148+
set
149+
{
150+
if (_consumerRecoveryExceptionHandler != null)
151+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionHandler)} after it has been initialized.");
152+
153+
_consumerRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionHandler));
154+
}
155+
}
156+
}
157+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs

Lines changed: 85 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,11 @@ private bool TryPerformAutomaticRecovery()
143143
// 2. Recover queues
144144
// 3. Recover bindings
145145
// 4. Recover consumers
146-
using (var recoveryChannel = _innerConnection.CreateChannel())
146+
using (var recoveryChannelFactory = new RecoveryChannelFactory(_innerConnection))
147147
{
148-
RecoverExchanges(recoveryChannel);
149-
RecoverQueues(recoveryChannel);
150-
RecoverBindings(recoveryChannel);
148+
RecoverExchanges(recoveryChannelFactory);
149+
RecoverQueues(recoveryChannelFactory);
150+
RecoverBindings(recoveryChannelFactory);
151151
}
152152

153153
}
@@ -211,28 +211,36 @@ private bool TryRecoverConnectionDelegate()
211211
return false;
212212
}
213213

214-
private void RecoverExchanges(IChannel channel)
214+
private void RecoverExchanges(RecoveryChannelFactory recoveryChannelFactory)
215215
{
216216
foreach (var recordedExchange in _recordedExchanges.Values.Where(x => _config.TopologyRecoveryFilter?.ExchangeFilter(x) ?? true))
217217
{
218218
try
219219
{
220-
recordedExchange.Recover(channel);
220+
recordedExchange.Recover(recoveryChannelFactory.RecoveryChannel);
221221
}
222222
catch (Exception ex)
223223
{
224-
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering exchange '{recordedExchange}'", ex));
224+
if (_config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler != null
225+
&& _config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionCondition(recordedExchange, ex))
226+
{
227+
_config.TopologyRecoveryExceptionHandler.ExchangeRecoveryExceptionHandler(recordedExchange, ex);
228+
}
229+
else
230+
{
231+
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering exchange '{recordedExchange}'", ex));
232+
}
225233
}
226234
}
227235
}
228236

229-
private void RecoverQueues(IChannel channel)
237+
private void RecoverQueues(RecoveryChannelFactory recoveryChannelFactory)
230238
{
231239
foreach (var recordedQueue in _recordedQueues.Values.Where(x => _config.TopologyRecoveryFilter?.QueueFilter(x) ?? true).ToArray())
232240
{
233241
try
234242
{
235-
var newName = recordedQueue.Recover(channel);
243+
var newName = recordedQueue.Recover(recoveryChannelFactory.RecoveryChannel);
236244
var oldName = recordedQueue.Name;
237245

238246
if (oldName != newName)
@@ -259,22 +267,38 @@ private void RecoverQueues(IChannel channel)
259267
}
260268
catch (Exception ex)
261269
{
262-
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering queue '{recordedQueue}'", ex));
270+
if (_config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler != null
271+
&& _config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionCondition(recordedQueue, ex))
272+
{
273+
_config.TopologyRecoveryExceptionHandler.QueueRecoveryExceptionHandler(recordedQueue, ex);
274+
}
275+
else
276+
{
277+
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering queue '{recordedQueue}'", ex));
278+
}
263279
}
264280
}
265281
}
266282

267-
private void RecoverBindings(IChannel channel)
283+
private void RecoverBindings(RecoveryChannelFactory recoveryChannelFactory)
268284
{
269285
foreach (var binding in _recordedBindings.Where(x => _config.TopologyRecoveryFilter?.BindingFilter(x) ?? true))
270286
{
271287
try
272288
{
273-
binding.Recover(channel);
289+
binding.Recover(recoveryChannelFactory.RecoveryChannel);
274290
}
275291
catch (Exception ex)
276292
{
277-
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering binding between {binding.Source} and {binding.Destination}", ex));
293+
if (_config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler != null
294+
&& _config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionCondition(binding, ex))
295+
{
296+
_config.TopologyRecoveryExceptionHandler.BindingRecoveryExceptionHandler(binding, ex);
297+
}
298+
else
299+
{
300+
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering binding between {binding.Source} and {binding.Destination}", ex));
301+
}
278302
}
279303
}
280304
}
@@ -303,7 +327,15 @@ internal void RecoverConsumers(AutorecoveringChannel channelToRecover, IChannel
303327
}
304328
catch (Exception ex)
305329
{
306-
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering consumer {oldTag} on queue {consumer.Queue}", ex));
330+
if (_config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler != null
331+
&& _config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionCondition(consumer, ex))
332+
{
333+
_config.TopologyRecoveryExceptionHandler.ConsumerRecoveryExceptionHandler(consumer, ex);
334+
}
335+
else
336+
{
337+
HandleTopologyRecoveryException(new TopologyRecoveryException($"Caught an exception while recovering consumer {oldTag} on queue {consumer.Queue}", ex));
338+
}
307339
}
308340
}
309341
}
@@ -318,5 +350,44 @@ private void RecoverChannelsAndItsConsumers()
318350
}
319351
}
320352
}
353+
354+
private sealed class RecoveryChannelFactory : IDisposable
355+
{
356+
private readonly IConnection _connection;
357+
private IModel? _recoveryChannel;
358+
359+
public RecoveryChannelFactory(IConnection connection)
360+
{
361+
_connection = connection;
362+
}
363+
364+
public IModel RecoveryChannel
365+
{
366+
get
367+
{
368+
if (_recoveryChannel == null)
369+
{
370+
_recoveryChannel = _connection.CreateModel();
371+
}
372+
373+
if (_recoveryChannel.IsClosed)
374+
{
375+
_recoveryChannel.Dispose();
376+
_recoveryChannel = _connection.CreateModel();
377+
}
378+
379+
return _recoveryChannel;
380+
}
381+
}
382+
383+
public void Dispose()
384+
{
385+
if (_recoveryChannel != null)
386+
{
387+
_recoveryChannel.Close();
388+
_recoveryChannel.Dispose();
389+
}
390+
}
391+
}
321392
}
322393
}

0 commit comments

Comments
 (0)