27
27
import java .util .concurrent .ConcurrentHashMap ;
28
28
import java .util .concurrent .ConcurrentMap ;
29
29
import java .util .function .Function ;
30
+ import java .util .function .Supplier ;
30
31
import java .util .stream .Collectors ;
31
32
import org .slf4j .Logger ;
32
33
import org .slf4j .LoggerFactory ;
33
34
34
35
final class ConnectionUtils {
35
36
37
+ static final RetryStrategy NO_RETRY_STRATEGY = Supplier ::get ;
38
+
36
39
static final ConnectionSettings .AffinityStrategy
37
40
LEADER_FOR_PUBLISHING_FOLLOWERS_FOR_CONSUMING_STRATEGY =
38
41
new LeaderForPublishingFollowersForConsumingStrategy ();
@@ -50,11 +53,11 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
50
53
AmqpManagement management ,
51
54
AffinityContext context ,
52
55
AffinityCache affinityCache ,
53
- ConnectionSettings .AffinityStrategy strategy ) {
54
- // TODO add retry for sensitive operations in affinity mechanism
56
+ ConnectionSettings .AffinityStrategy strategy ,
57
+ RetryStrategy retryStrategy ) {
55
58
if (context == null ) {
56
59
// no affinity asked, we create a connection and return it
57
- return connectionFactory .apply (null );
60
+ return retryStrategy . maybeRetry (() -> connectionFactory .apply (null ) );
58
61
}
59
62
try {
60
63
AmqpConnection .NativeConnectionWrapper pickedConnection = null ;
@@ -66,8 +69,8 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
66
69
attemptCount ++;
67
70
AmqpConnection .NativeConnectionWrapper connectionWrapper = null ;
68
71
if (info == null ) {
69
- connectionWrapper = connectionFactory .apply (null );
70
- info = lookUpQueueInfo (management , context , affinityCache );
72
+ connectionWrapper = retryStrategy . maybeRetry (() -> connectionFactory .apply (null ) );
73
+ info = lookUpQueueInfo (management , context , affinityCache , retryStrategy );
71
74
queueInfoRefreshed = true ;
72
75
}
73
76
if (info == null ) {
@@ -89,7 +92,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
89
92
.map (affinityCache ::nodenameToAddress )
90
93
.filter (Objects ::nonNull )
91
94
.collect (Collectors .toList ());
92
- connectionWrapper = connectionFactory .apply (addressHints );
95
+ connectionWrapper = retryStrategy . maybeRetry (() -> connectionFactory .apply (addressHints ) );
93
96
}
94
97
LOGGER .debug ("Nodes matching affinity {}: {}." , context , nodesWithAffinity );
95
98
LOGGER .debug ("Currently connected to node {}." , connectionWrapper .nodename ());
@@ -98,7 +101,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
98
101
if (!queueInfoRefreshed ) {
99
102
LOGGER .debug (
100
103
"Found affinity, but refreshing queue information to check affinity is still valid." );
101
- info = lookUpQueueInfo (management , context , affinityCache );
104
+ info = lookUpQueueInfo (management , context , affinityCache , retryStrategy );
102
105
if (info == null ) {
103
106
LOGGER .debug ("Could not look up info for queue '{}'" , context .queue ());
104
107
pickedConnection = connectionWrapper ;
@@ -129,7 +132,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
129
132
LOGGER .debug (
130
133
"Affinity {} not found with node {}." , context , connectionWrapper .nodename ());
131
134
if (!queueInfoRefreshed ) {
132
- info = lookUpQueueInfo (management , context , affinityCache );
135
+ info = lookUpQueueInfo (management , context , affinityCache , retryStrategy );
133
136
if (info != null ) {
134
137
nodesWithAffinity = strategy .nodesWithAffinity (context , info );
135
138
queueInfoRefreshed = true ;
@@ -147,18 +150,24 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
147
150
}
148
151
149
152
private static Management .QueueInfo lookUpQueueInfo (
150
- AmqpManagement management , AffinityContext affinity , AffinityCache cache ) {
151
- Management .QueueInfo info = null ;
152
- management .init ();
153
- try {
154
- info = management .queueInfo (affinity .queue ());
155
- cache .queueInfo (info );
156
- } catch (AmqpException .AmqpEntityDoesNotExistException e ) {
157
- LOGGER .debug ("Queue '{}' does not exist." , affinity .queue ());
158
- cache .clearQueueInfoEntry (affinity .queue ());
159
- // we just return null, caller will have to return the last connection
160
- }
161
- return info ;
153
+ AmqpManagement management ,
154
+ AffinityContext affinity ,
155
+ AffinityCache cache ,
156
+ RetryStrategy retryStrategy ) {
157
+ return retryStrategy .maybeRetry (
158
+ () -> {
159
+ Management .QueueInfo info = null ;
160
+ management .init ();
161
+ try {
162
+ info = management .queueInfo (affinity .queue ());
163
+ cache .queueInfo (info );
164
+ } catch (AmqpException .AmqpEntityDoesNotExistException e ) {
165
+ LOGGER .debug ("Queue '{}' does not exist." , affinity .queue ());
166
+ cache .clearQueueInfoEntry (affinity .queue ());
167
+ // we just return null, caller will have to return the last connection
168
+ }
169
+ return info ;
170
+ });
162
171
}
163
172
164
173
// TODO clean affinity cache (LRU or size-based)
@@ -298,4 +307,10 @@ public List<String> nodesWithAffinity(
298
307
return nodesWithAffinity ;
299
308
}
300
309
}
310
+
311
+ @ FunctionalInterface
312
+ interface RetryStrategy {
313
+
314
+ <T > T maybeRetry (Supplier <T > task );
315
+ }
301
316
}
0 commit comments