Skip to content

Commit 2052ca5

Browse files
committed
Make affinity strategy pluggable
1 parent 90b57ed commit 2052ca5

File tree

6 files changed

+134
-75
lines changed

6 files changed

+134
-75
lines changed

src/main/java/com/rabbitmq/client/amqp/ConnectionSettings.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package com.rabbitmq.client.amqp;
1919

2020
import java.time.Duration;
21+
import java.util.List;
2122
import javax.net.ssl.SSLContext;
2223

2324
public interface ConnectionSettings<T> {
@@ -72,11 +73,26 @@ interface Affinity<T> {
7273

7374
Affinity<T> reuse(boolean reuse);
7475

76+
Affinity<T> strategy(AffinityStrategy strategy);
77+
7578
T connection();
7679

7780
enum Operation {
7881
PUBLISH,
7982
CONSUME
8083
}
8184
}
85+
86+
interface AffinityContext {
87+
88+
String queue();
89+
90+
Affinity.Operation operation();
91+
}
92+
93+
@FunctionalInterface
94+
interface AffinityStrategy {
95+
96+
List<String> nodesWithAffinity(AffinityContext context, Management.QueueInfo info);
97+
}
8298
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
6868
private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
6969
private final DefaultConnectionSettings<?> connectionSettings;
7070
private final Supplier<SessionHandler> sessionHandlerSupplier;
71-
private final ConnectionUtils.ConnectionAffinity affinity;
71+
private final ConnectionUtils.AffinityContext affinity;
72+
private final ConnectionSettings.AffinityStrategy affinityStrategy;
7273

7374
AmqpConnection(AmqpConnectionBuilder builder) {
7475
super(builder.listeners());
@@ -113,12 +114,16 @@ final class AmqpConnection extends ResourceBase implements Connection {
113114
this.recoveryRequestQueue = null;
114115
this.recoveryLoop = null;
115116
}
116-
this.affinity =
117-
this.connectionSettings.affinity().activated()
118-
? new ConnectionUtils.ConnectionAffinity(
119-
this.connectionSettings.affinity().queue(),
120-
this.connectionSettings.affinity().operation())
121-
: null;
117+
if (this.connectionSettings.affinity().activated()) {
118+
this.affinity =
119+
new ConnectionUtils.AffinityContext(
120+
this.connectionSettings.affinity().queue(),
121+
this.connectionSettings.affinity().operation());
122+
this.affinityStrategy = connectionSettings.affinity().strategy();
123+
} else {
124+
this.affinity = null;
125+
this.affinityStrategy = null;
126+
}
122127
this.management = createManagement();
123128
NativeConnectionWrapper ncw =
124129
ConnectionUtils.enforceAffinity(
@@ -130,7 +135,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
130135
},
131136
this.management,
132137
this.affinity,
133-
this.environment.affinityCache());
138+
this.environment.affinityCache(),
139+
this.affinityStrategy);
134140
this.sync(ncw);
135141
this.state(OPEN);
136142
this.environment.metricsCollector().openConnection();
@@ -397,7 +403,8 @@ private NativeConnectionWrapper recoverNativeConnection(
397403
},
398404
this.management,
399405
this.affinity,
400-
this.environment.affinityCache());
406+
this.environment.affinityCache(),
407+
this.affinityStrategy);
401408
return result;
402409
} catch (Exception ex) {
403410
LOGGER.info("Error while trying to recover connection", ex);
@@ -613,7 +620,7 @@ String connectionNodename() {
613620
return this.connectionNodename;
614621
}
615622

616-
ConnectionUtils.ConnectionAffinity affinity() {
623+
ConnectionUtils.AffinityContext affinity() {
617624
return this.affinity;
618625
}
619626

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ AmqpConnection connection(AmqpConnectionBuilder builder) {
4646
&& builder.connectionSettings().affinity().reuse()) {
4747
builder.connectionSettings().affinity().validate();
4848

49-
ConnectionUtils.ConnectionAffinity affinity =
50-
new ConnectionUtils.ConnectionAffinity(
49+
ConnectionUtils.AffinityContext affinity =
50+
new ConnectionUtils.AffinityContext(
5151
builder.connectionSettings().affinity().queue(),
5252
builder.connectionSettings().affinity().operation());
5353
connection =

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 71 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -33,17 +33,21 @@
3333

3434
final class ConnectionUtils {
3535

36+
static final ConnectionSettings.AffinityStrategy PREFER_LEADER_FOR_PUBLISHING_STRATEGY =
37+
new PreferLeaderForPublishingAffinityStrategy();
38+
3639
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionUtils.class);
3740

3841
private ConnectionUtils() {}
3942

4043
static AmqpConnection.NativeConnectionWrapper enforceAffinity(
4144
Function<List<Address>, AmqpConnection.NativeConnectionWrapper> connectionFactory,
4245
AmqpManagement management,
43-
ConnectionAffinity affinity,
44-
AffinityCache affinityCache) {
46+
AffinityContext context,
47+
AffinityCache affinityCache,
48+
ConnectionSettings.AffinityStrategy strategy) {
4549
// TODO add retry for sensitive operations in affinity mechanism
46-
if (affinity == null) {
50+
if (context == null) {
4751
// no affinity asked, we create a connection and return it
4852
return connectionFactory.apply(null);
4953
}
@@ -52,13 +56,13 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
5256
int attemptCount = 0;
5357
boolean queueInfoRefreshed = false;
5458
List<String> nodesWithAffinity = null;
55-
Management.QueueInfo info = affinityCache.queueInfo(affinity.queue());
59+
Management.QueueInfo info = affinityCache.queueInfo(context.queue());
5660
while (pickedConnection == null) {
5761
attemptCount++;
5862
AmqpConnection.NativeConnectionWrapper connectionWrapper = null;
5963
if (info == null) {
6064
connectionWrapper = connectionFactory.apply(null);
61-
info = lookUpQueueInfo(management, affinity, affinityCache);
65+
info = lookUpQueueInfo(management, context, affinityCache);
6266
queueInfoRefreshed = true;
6367
}
6468
if (info == null) {
@@ -72,7 +76,7 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
7276
info.leader(),
7377
info.replicas());
7478
if (nodesWithAffinity == null) {
75-
nodesWithAffinity = findAffinity(affinity, info);
79+
nodesWithAffinity = strategy.nodesWithAffinity(context, info);
7680
}
7781
if (connectionWrapper == null) {
7882
List<Address> addressHints =
@@ -82,19 +86,19 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
8286
.collect(Collectors.toList());
8387
connectionWrapper = connectionFactory.apply(addressHints);
8488
}
85-
LOGGER.debug("Nodes matching affinity {}: {}.", affinity, nodesWithAffinity);
89+
LOGGER.debug("Nodes matching affinity {}: {}.", context, nodesWithAffinity);
8690
LOGGER.debug("Currently connected to node {}.", connectionWrapper.nodename());
8791
affinityCache.nodenameToAddress(connectionWrapper.nodename(), connectionWrapper.address());
8892
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
8993
if (!queueInfoRefreshed) {
9094
LOGGER.debug(
9195
"Found affinity, but refreshing queue information to check affinity is still valid.");
92-
info = lookUpQueueInfo(management, affinity, affinityCache);
96+
info = lookUpQueueInfo(management, context, affinityCache);
9397
if (info == null) {
94-
LOGGER.debug("Could not look up info for queue '{}'", affinity.queue());
98+
LOGGER.debug("Could not look up info for queue '{}'", context.queue());
9599
pickedConnection = connectionWrapper;
96100
} else {
97-
nodesWithAffinity = findAffinity(affinity, info);
101+
nodesWithAffinity = strategy.nodesWithAffinity(context, info);
98102
queueInfoRefreshed = true;
99103
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
100104
pickedConnection = connectionWrapper;
@@ -113,16 +117,16 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
113117
} else if (attemptCount == 5) {
114118
LOGGER.debug(
115119
"Could not find affinity {} after {} attempt(s), using last connection.",
116-
affinity,
120+
context,
117121
attemptCount);
118122
pickedConnection = connectionWrapper;
119123
} else {
120124
LOGGER.debug(
121-
"Affinity {} not found with node {}.", affinity, connectionWrapper.nodename());
125+
"Affinity {} not found with node {}.", context, connectionWrapper.nodename());
122126
if (!queueInfoRefreshed) {
123-
info = lookUpQueueInfo(management, affinity, affinityCache);
127+
info = lookUpQueueInfo(management, context, affinityCache);
124128
if (info != null) {
125-
nodesWithAffinity = findAffinity(affinity, info);
129+
nodesWithAffinity = strategy.nodesWithAffinity(context, info);
126130
queueInfoRefreshed = true;
127131
}
128132
}
@@ -132,13 +136,13 @@ static AmqpConnection.NativeConnectionWrapper enforceAffinity(
132136
}
133137
return pickedConnection;
134138
} catch (RuntimeException e) {
135-
LOGGER.warn("Cannot enforce affinity {} of error when looking up queue", affinity, e);
139+
LOGGER.warn("Cannot enforce affinity {} of error when looking up queue", context, e);
136140
throw e;
137141
}
138142
}
139143

140144
private static Management.QueueInfo lookUpQueueInfo(
141-
AmqpManagement management, ConnectionAffinity affinity, AffinityCache cache) {
145+
AmqpManagement management, AffinityContext affinity, AffinityCache cache) {
142146
Management.QueueInfo info = null;
143147
management.init();
144148
try {
@@ -185,57 +189,17 @@ Address nodenameToAddress(String name) {
185189
}
186190
}
187191

188-
static List<String> findAffinity(ConnectionAffinity affinity, Management.QueueInfo info) {
189-
ConnectionSettings.Affinity.Operation operation = affinity.operation();
190-
String leader = info.leader();
191-
List<String> replicas = info.replicas() == null ? Collections.emptyList() : info.replicas();
192-
List<String> nodesWithAffinity;
193-
LOGGER.debug(
194-
"Trying to find affinity {} with leader = {}, replicas = {}", affinity, leader, replicas);
195-
if (info.type() == Management.QueueType.QUORUM || info.type() == Management.QueueType.STREAM) {
196-
// we may choose between leader and replicas
197-
if (operation == ConnectionSettings.Affinity.Operation.PUBLISH) {
198-
if (leader == null || leader.isBlank()) {
199-
nodesWithAffinity = Collections.emptyList();
200-
} else {
201-
nodesWithAffinity = List.of(leader);
202-
}
203-
} else if (operation == ConnectionSettings.Affinity.Operation.CONSUME) {
204-
List<String> followers =
205-
replicas.stream()
206-
.filter(Objects::nonNull)
207-
.filter(r -> !r.equals(leader))
208-
.collect(Collectors.toList());
209-
if (!followers.isEmpty()) {
210-
nodesWithAffinity = List.copyOf(followers);
211-
} else if (leader != null && !leader.isBlank()) {
212-
nodesWithAffinity = List.of(leader);
213-
} else {
214-
nodesWithAffinity = Collections.emptyList();
215-
}
216-
} else {
217-
// we don't care about the operation, we just return a replica
218-
nodesWithAffinity = List.copyOf(replicas);
219-
}
220-
} else {
221-
// classic queue, leader and replica are the same
222-
nodesWithAffinity = List.copyOf(replicas);
223-
}
224-
LOGGER.debug("Nodes with affinity: {}", nodesWithAffinity);
225-
return nodesWithAffinity;
226-
}
227-
228-
static class ConnectionAffinity {
192+
static class AffinityContext implements ConnectionSettings.AffinityContext {
229193

230194
private final String queue;
231195
private final ConnectionSettings.Affinity.Operation operation;
232196

233-
ConnectionAffinity(String queue, ConnectionSettings.Affinity.Operation operation) {
197+
AffinityContext(String queue, ConnectionSettings.Affinity.Operation operation) {
234198
this.queue = queue;
235199
this.operation = operation;
236200
}
237201

238-
String queue() {
202+
public String queue() {
239203
return this.queue;
240204
}
241205

@@ -252,7 +216,7 @@ public String toString() {
252216
public boolean equals(Object o) {
253217
if (this == o) return true;
254218
if (o == null || getClass() != o.getClass()) return false;
255-
ConnectionAffinity that = (ConnectionAffinity) o;
219+
AffinityContext that = (AffinityContext) o;
256220
return Objects.equals(queue, that.queue) && operation == that.operation;
257221
}
258222

@@ -261,4 +225,51 @@ public int hashCode() {
261225
return Objects.hash(queue, operation);
262226
}
263227
}
228+
229+
static class PreferLeaderForPublishingAffinityStrategy
230+
implements ConnectionSettings.AffinityStrategy {
231+
232+
@Override
233+
public List<String> nodesWithAffinity(
234+
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
235+
ConnectionSettings.Affinity.Operation operation = context.operation();
236+
String leader = info.leader();
237+
List<String> replicas = info.replicas() == null ? Collections.emptyList() : info.replicas();
238+
List<String> nodesWithAffinity;
239+
LOGGER.debug(
240+
"Trying to find affinity {} with leader = {}, replicas = {}", context, leader, replicas);
241+
if (info.type() == Management.QueueType.QUORUM
242+
|| info.type() == Management.QueueType.STREAM) {
243+
// we may choose between leader and replicas
244+
if (operation == ConnectionSettings.Affinity.Operation.PUBLISH) {
245+
if (leader == null || leader.isBlank()) {
246+
nodesWithAffinity = Collections.emptyList();
247+
} else {
248+
nodesWithAffinity = List.of(leader);
249+
}
250+
} else if (operation == ConnectionSettings.Affinity.Operation.CONSUME) {
251+
List<String> followers =
252+
replicas.stream()
253+
.filter(Objects::nonNull)
254+
.filter(r -> !r.equals(leader))
255+
.collect(Collectors.toList());
256+
if (!followers.isEmpty()) {
257+
nodesWithAffinity = List.copyOf(followers);
258+
} else if (leader != null && !leader.isBlank()) {
259+
nodesWithAffinity = List.of(leader);
260+
} else {
261+
nodesWithAffinity = Collections.emptyList();
262+
}
263+
} else {
264+
// we don't care about the operation, we just return a replica
265+
nodesWithAffinity = List.copyOf(replicas);
266+
}
267+
} else {
268+
// classic queue, leader and replica are the same
269+
nodesWithAffinity = List.copyOf(replicas);
270+
}
271+
LOGGER.debug("Nodes with affinity: {}", nodesWithAffinity);
272+
return nodesWithAffinity;
273+
}
274+
}
264275
}

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ static class DefaultAffinity<T> implements Affinity<T> {
413413
private String queue;
414414
private Operation operation;
415415
private boolean reuse = false;
416+
private AffinityStrategy strategy = ConnectionUtils.PREFER_LEADER_FOR_PUBLISHING_STRATEGY;
416417

417418
DefaultAffinity(DefaultConnectionSettings<T> connectionSettings) {
418419
this.connectionSettings = connectionSettings;
@@ -436,6 +437,13 @@ public Affinity<T> reuse(boolean reuse) {
436437
return this;
437438
}
438439

440+
@Override
441+
public Affinity<T> strategy(AffinityStrategy strategy) {
442+
Assert.notNull(strategy, "Affinity strategy cannot be null");
443+
this.strategy = strategy;
444+
return this;
445+
}
446+
439447
@Override
440448
public T connection() {
441449
return this.connectionSettings.toReturn();
@@ -453,10 +461,15 @@ boolean reuse() {
453461
return this.reuse;
454462
}
455463

464+
AffinityStrategy strategy() {
465+
return strategy;
466+
}
467+
456468
void copyTo(Affinity<?> copy) {
457469
copy.queue(this.queue);
458470
copy.operation(this.operation);
459471
copy.reuse(this.reuse);
472+
copy.strategy(this.strategy);
460473
}
461474

462475
boolean activated() {

0 commit comments

Comments
 (0)