Skip to content

Commit 857f3e7

Browse files
authored
Merge pull request #1312 from rosca-sabina/feature/658-add-filtering-to-topology-recovery
Add custom filtering and exception handling to topology recovery
2 parents d457b01 + 14845da commit 857f3e7

17 files changed

+1048
-39
lines changed

projects/RabbitMQ.Client/client/api/ConnectionConfig.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,17 @@ public sealed class ConnectionConfig
8686
/// </summary>
8787
public bool TopologyRecoveryEnabled { get; }
8888

89+
/// <summary>
90+
/// Filter to include/exclude entities from topology recovery.
91+
/// Default filter includes all entities in topology recovery.
92+
/// </summary>
93+
public TopologyRecoveryFilter TopologyRecoveryFilter { get; }
94+
95+
/// <summary>
96+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
97+
/// </summary>
98+
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; }
99+
89100
/// <summary>
90101
/// Amount of time client will wait for before re-trying to recover connection.
91102
/// </summary>
@@ -128,6 +139,7 @@ public sealed class ConnectionConfig
128139
internal ConnectionConfig(string virtualHost, string userName, string password, IList<IAuthMechanismFactory> authMechanisms,
129140
IDictionary<string, object?> clientProperties, string? clientProvidedName,
130141
ushort maxChannelCount, uint maxFrameSize, bool topologyRecoveryEnabled,
142+
TopologyRecoveryFilter topologyRecoveryFilter, TopologyRecoveryExceptionHandler topologyRecoveryExceptionHandler,
131143
TimeSpan networkRecoveryInterval, TimeSpan heartbeatInterval, TimeSpan continuationTimeout, TimeSpan handshakeContinuationTimeout, TimeSpan requestedConnectionTimeout,
132144
bool dispatchConsumersAsync, int dispatchConsumerConcurrency,
133145
Func<AmqpTcpEndpoint, IFrameHandler> frameHandlerFactory)
@@ -141,6 +153,8 @@ internal ConnectionConfig(string virtualHost, string userName, string password,
141153
MaxChannelCount = maxChannelCount;
142154
MaxFrameSize = maxFrameSize;
143155
TopologyRecoveryEnabled = topologyRecoveryEnabled;
156+
TopologyRecoveryFilter = topologyRecoveryFilter;
157+
TopologyRecoveryExceptionHandler = topologyRecoveryExceptionHandler;
144158
NetworkRecoveryInterval = networkRecoveryInterval;
145159
HeartbeatInterval = heartbeatInterval;
146160
ContinuationTimeout = continuationTimeout;

projects/RabbitMQ.Client/client/api/ConnectionFactory.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,17 @@ public TimeSpan ContinuationTimeout
259259
/// </summary>
260260
public bool TopologyRecoveryEnabled { get; set; } = true;
261261

262+
/// <summary>
263+
/// Filter to include/exclude entities from topology recovery.
264+
/// Default filter includes all entities in topology recovery.
265+
/// </summary>
266+
public TopologyRecoveryFilter TopologyRecoveryFilter { get; set; } = new TopologyRecoveryFilter();
267+
268+
/// <summary>
269+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
270+
/// </summary>
271+
public TopologyRecoveryExceptionHandler TopologyRecoveryExceptionHandler { get; set; } = new TopologyRecoveryExceptionHandler();
272+
262273
/// <summary>
263274
/// Construct a fresh instance, with all fields set to their respective defaults.
264275
/// </summary>
@@ -535,6 +546,8 @@ private ConnectionConfig CreateConfig(string clientProvidedName)
535546
RequestedChannelMax,
536547
RequestedFrameMax,
537548
TopologyRecoveryEnabled,
549+
TopologyRecoveryFilter,
550+
TopologyRecoveryExceptionHandler,
538551
NetworkRecoveryInterval,
539552
RequestedHeartbeat,
540553
ContinuationTimeout,
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedBinding
7+
{
8+
string Source { get; }
9+
10+
string Destination { get; }
11+
12+
string RoutingKey { get; }
13+
14+
IDictionary<string, object>? Arguments { get; }
15+
}
16+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedConsumer
7+
{
8+
string ConsumerTag { get; }
9+
10+
string Queue { get; }
11+
12+
bool AutoAck { get; }
13+
14+
bool Exclusive { get; }
15+
16+
IDictionary<string, object>? Arguments { get; }
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedExchange
7+
{
8+
string Name { get; }
9+
10+
string Type { get; }
11+
12+
bool Durable { get; }
13+
14+
bool AutoDelete { get; }
15+
16+
IDictionary<string, object>? Arguments { get; }
17+
}
18+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System.Collections.Generic;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
#nullable enable
6+
public interface IRecordedQueue
7+
{
8+
string Name { get; }
9+
10+
bool Durable { get; }
11+
12+
bool Exclusive { get; }
13+
14+
bool AutoDelete { get; }
15+
16+
IDictionary<string, object>? Arguments { get; }
17+
18+
bool IsServerNamed { get; }
19+
}
20+
}
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Custom logic for handling topology recovery exceptions that match the specified filters.
7+
/// </summary>
8+
public class TopologyRecoveryExceptionHandler
9+
{
10+
private static readonly Func<IRecordedExchange, Exception, bool> s_defaultExchangeExceptionCondition = (e, ex) => true;
11+
private static readonly Func<IRecordedQueue, Exception, bool> s_defaultQueueExceptionCondition = (q, ex) => true;
12+
private static readonly Func<IRecordedBinding, Exception, bool> s_defaultBindingExceptionCondition = (b, ex) => true;
13+
private static readonly Func<IRecordedConsumer, Exception, bool> s_defaultConsumerExceptionCondition = (c, ex) => true;
14+
15+
private Func<IRecordedExchange, Exception, bool> _exchangeRecoveryExceptionCondition;
16+
private Func<IRecordedQueue, Exception, bool> _queueRecoveryExceptionCondition;
17+
private Func<IRecordedBinding, Exception, bool> _bindingRecoveryExceptionCondition;
18+
private Func<IRecordedConsumer, Exception, bool> _consumerRecoveryExceptionCondition;
19+
private Action<IRecordedExchange, Exception, IConnection> _exchangeRecoveryExceptionHandler;
20+
private Action<IRecordedQueue, Exception, IConnection> _queueRecoveryExceptionHandler;
21+
private Action<IRecordedBinding, Exception, IConnection> _bindingRecoveryExceptionHandler;
22+
private Action<IRecordedConsumer, Exception, IConnection> _consumerRecoveryExceptionHandler;
23+
24+
/// <summary>
25+
/// Decides which exchange recovery exceptions the custom exception handler is applied to.
26+
/// Default condition applies the exception handler to all exchange recovery exceptions.
27+
/// </summary>
28+
public Func<IRecordedExchange, Exception, bool> ExchangeRecoveryExceptionCondition
29+
{
30+
get => _exchangeRecoveryExceptionCondition ?? s_defaultExchangeExceptionCondition;
31+
32+
set
33+
{
34+
if (_exchangeRecoveryExceptionCondition != null)
35+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
36+
37+
_exchangeRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
38+
}
39+
}
40+
41+
/// <summary>
42+
/// Decides which queue recovery exceptions the custom exception handler is applied to.
43+
/// Default condition applies the exception handler to all queue recovery exceptions.
44+
/// </summary>
45+
public Func<IRecordedQueue, Exception, bool> QueueRecoveryExceptionCondition
46+
{
47+
get => _queueRecoveryExceptionCondition ?? s_defaultQueueExceptionCondition;
48+
49+
set
50+
{
51+
if (_queueRecoveryExceptionCondition != null)
52+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionCondition)} after it has been initialized.");
53+
54+
_queueRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionCondition));
55+
}
56+
}
57+
58+
/// <summary>
59+
/// Decides which binding recovery exceptions the custom exception handler is applied to.
60+
/// Default condition applies the exception handler to all binding recovery exceptions.
61+
/// </summary>
62+
public Func<IRecordedBinding, Exception, bool> BindingRecoveryExceptionCondition
63+
{
64+
get => _bindingRecoveryExceptionCondition ?? s_defaultBindingExceptionCondition;
65+
66+
set
67+
{
68+
if (_bindingRecoveryExceptionCondition != null)
69+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionCondition)} after it has been initialized.");
70+
71+
_bindingRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionCondition));
72+
}
73+
}
74+
75+
/// <summary>
76+
/// Decides which consumer recovery exceptions the custom exception handler is applied to.
77+
/// Default condition applies the exception handler to all consumer recovery exceptions.
78+
/// </summary>
79+
public Func<IRecordedConsumer, Exception, bool> ConsumerRecoveryExceptionCondition
80+
{
81+
get => _consumerRecoveryExceptionCondition ?? s_defaultConsumerExceptionCondition;
82+
83+
set
84+
{
85+
if (_consumerRecoveryExceptionCondition != null)
86+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionCondition)} after it has been initialized.");
87+
88+
_consumerRecoveryExceptionCondition = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionCondition));
89+
}
90+
}
91+
92+
/// <summary>
93+
/// Retries, or otherwise handles, an exception thrown when attempting to recover an exchange.
94+
/// </summary>
95+
public Action<IRecordedExchange, Exception, IConnection> ExchangeRecoveryExceptionHandler
96+
{
97+
get => _exchangeRecoveryExceptionHandler;
98+
99+
set
100+
{
101+
if (_exchangeRecoveryExceptionHandler != null)
102+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeRecoveryExceptionHandler)} after it has been initialized.");
103+
104+
_exchangeRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ExchangeRecoveryExceptionHandler));
105+
}
106+
}
107+
108+
/// <summary>
109+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a queue.
110+
/// </summary>
111+
public Action<IRecordedQueue, Exception, IConnection> QueueRecoveryExceptionHandler
112+
{
113+
get => _queueRecoveryExceptionHandler;
114+
115+
set
116+
{
117+
if (_queueRecoveryExceptionHandler != null)
118+
throw new InvalidOperationException($"Cannot modify {nameof(QueueRecoveryExceptionHandler)} after it has been initialized.");
119+
120+
_queueRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(QueueRecoveryExceptionHandler));
121+
}
122+
}
123+
124+
/// <summary>
125+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a binding.
126+
/// </summary>
127+
public Action<IRecordedBinding, Exception, IConnection> BindingRecoveryExceptionHandler
128+
{
129+
get => _bindingRecoveryExceptionHandler;
130+
131+
set
132+
{
133+
if (_bindingRecoveryExceptionHandler != null)
134+
throw new InvalidOperationException($"Cannot modify {nameof(BindingRecoveryExceptionHandler)} after it has been initialized.");
135+
136+
_bindingRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(BindingRecoveryExceptionHandler));
137+
}
138+
}
139+
140+
/// <summary>
141+
/// Retries, or otherwise handles, an exception thrown when attempting to recover a consumer.
142+
/// </summary>
143+
public Action<IRecordedConsumer, Exception, IConnection> ConsumerRecoveryExceptionHandler
144+
{
145+
get => _consumerRecoveryExceptionHandler;
146+
147+
set
148+
{
149+
if (_consumerRecoveryExceptionHandler != null)
150+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerRecoveryExceptionHandler)} after it has been initialized.");
151+
152+
_consumerRecoveryExceptionHandler = value ?? throw new ArgumentNullException(nameof(ConsumerRecoveryExceptionHandler));
153+
}
154+
}
155+
}
156+
}
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
using System;
2+
3+
namespace RabbitMQ.Client
4+
{
5+
/// <summary>
6+
/// Filter to know which entities (exchanges, queues, bindings, consumers) should be recovered by topology recovery.
7+
/// By default, allows all entities to be recovered.
8+
/// </summary>
9+
public class TopologyRecoveryFilter
10+
{
11+
private static readonly Func<IRecordedExchange, bool> s_defaultExchangeFilter = exchange => true;
12+
private static readonly Func<IRecordedQueue, bool> s_defaultQueueFilter = queue => true;
13+
private static readonly Func<IRecordedBinding, bool> s_defaultBindingFilter = binding => true;
14+
private static readonly Func<IRecordedConsumer, bool> s_defaultConsumerFilter = consumer => true;
15+
16+
private Func<IRecordedExchange, bool> _exchangeFilter;
17+
private Func<IRecordedQueue, bool> _queueFilter;
18+
private Func<IRecordedBinding, bool> _bindingFilter;
19+
private Func<IRecordedConsumer, bool> _consumerFilter;
20+
21+
/// <summary>
22+
/// Decides whether an exchange is recovered or not.
23+
/// </summary>
24+
public Func<IRecordedExchange, bool> ExchangeFilter
25+
{
26+
get => _exchangeFilter ?? s_defaultExchangeFilter;
27+
28+
set
29+
{
30+
if (_exchangeFilter != null)
31+
throw new InvalidOperationException($"Cannot modify {nameof(ExchangeFilter)} after it has been initialized.");
32+
33+
_exchangeFilter = value ?? throw new ArgumentNullException(nameof(ExchangeFilter));
34+
}
35+
}
36+
37+
/// <summary>
38+
/// Decides whether a queue is recovered or not.
39+
/// </summary>
40+
public Func<IRecordedQueue, bool> QueueFilter
41+
{
42+
get => _queueFilter ?? s_defaultQueueFilter;
43+
44+
set
45+
{
46+
if (_queueFilter != null)
47+
throw new InvalidOperationException($"Cannot modify {nameof(QueueFilter)} after it has been initialized.");
48+
49+
_queueFilter = value ?? throw new ArgumentNullException(nameof(QueueFilter));
50+
}
51+
}
52+
53+
/// <summary>
54+
/// Decides whether a binding is recovered or not.
55+
/// </summary>
56+
public Func<IRecordedBinding, bool> BindingFilter
57+
{
58+
get => _bindingFilter ?? s_defaultBindingFilter;
59+
60+
set
61+
{
62+
if (_bindingFilter != null)
63+
throw new InvalidOperationException($"Cannot modify {nameof(BindingFilter)} after it has been initialized.");
64+
65+
_bindingFilter = value ?? throw new ArgumentNullException(nameof(BindingFilter));
66+
}
67+
}
68+
69+
/// <summary>
70+
/// Decides whether a consumer is recovered or not.
71+
/// </summary>
72+
public Func<IRecordedConsumer, bool> ConsumerFilter
73+
{
74+
get => _consumerFilter ?? s_defaultConsumerFilter;
75+
76+
set
77+
{
78+
if (_consumerFilter != null)
79+
throw new InvalidOperationException($"Cannot modify {nameof(ConsumerFilter)} after it has been initialized.");
80+
81+
_consumerFilter = value ?? throw new ArgumentNullException(nameof(ConsumerFilter));
82+
}
83+
}
84+
}
85+
}

projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recording.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ internal void DeleteAutoDeleteExchange(string exchangeName)
7777
{
7878
lock (_recordedEntitiesLock)
7979
{
80-
if (_recordedExchanges.TryGetValue(exchangeName, out var recordedExchange) && recordedExchange.IsAutoDelete)
80+
if (_recordedExchanges.TryGetValue(exchangeName, out var recordedExchange) && recordedExchange.AutoDelete)
8181
{
8282
if (!AnyBindingsOnExchange(exchangeName))
8383
{
@@ -204,7 +204,7 @@ internal void DeleteRecordedConsumer(string consumerTag)
204204

205205
void DeleteAutoDeleteQueue(string queue)
206206
{
207-
if (_recordedQueues.TryGetValue(queue, out var recordedQueue) && recordedQueue.IsAutoDelete)
207+
if (_recordedQueues.TryGetValue(queue, out var recordedQueue) && recordedQueue.AutoDelete)
208208
{
209209
// last consumer on this connection is gone, remove recorded queue if it is auto-deleted.
210210
if (!AnyConsumersOnQueue(queue))

0 commit comments

Comments
 (0)