Skip to content

Commit 59f0b9d

Browse files
committed
Move affinity algorithm in dedicated class
It is isolated enough to be outside of the AMQP connection class and tested independently.
1 parent c768f89 commit 59f0b9d

File tree

3 files changed

+114
-104
lines changed

3 files changed

+114
-104
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 15 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,8 @@
3030
import java.util.concurrent.atomic.AtomicLong;
3131
import java.util.concurrent.atomic.AtomicReference;
3232
import java.util.function.BiConsumer;
33-
import java.util.function.Function;
3433
import java.util.function.Predicate;
3534
import java.util.function.Supplier;
36-
import java.util.stream.Collectors;
3735
import org.apache.qpid.protonj2.client.ConnectionOptions;
3836
import org.apache.qpid.protonj2.client.DisconnectionEvent;
3937
import org.apache.qpid.protonj2.client.Session;
@@ -123,19 +121,17 @@ final class AmqpConnection extends ResourceBase implements Connection {
123121
: null;
124122
this.management = createManagement();
125123
NativeConnectionWrapper ncw =
126-
enforceAffinity(
124+
ConnectionUtils.enforceAffinity(
127125
addrs -> {
128126
NativeConnectionWrapper wrapper =
129127
connect(this.connectionSettings, builder.name(), disconnectHandler, addrs);
130-
this.nativeConnection = wrapper.connection;
128+
this.nativeConnection = wrapper.connection();
131129
return wrapper;
132130
},
133131
this.management,
134132
this.affinity,
135133
this.environment.affinityCache());
136-
this.connectionAddress = ncw.address;
137-
this.connectionNodename = ncw.nodename;
138-
this.nativeConnection = ncw.connection;
134+
this.sync(ncw);
139135
this.state(OPEN);
140136
this.environment.metricsCollector().openConnection();
141137
}
@@ -235,96 +231,10 @@ private NativeConnectionWrapper connect(
235231
}
236232
}
237233

238-
static NativeConnectionWrapper enforceAffinity(
239-
Function<List<Address>, NativeConnectionWrapper> connectionFactory,
240-
AmqpManagement management,
241-
ConnectionUtils.ConnectionAffinity affinity,
242-
ConnectionUtils.AffinityCache affinityCache) {
243-
// TODO add retry for sensitive operations in affinity mechanism
244-
if (affinity == null) {
245-
return connectionFactory.apply(null);
246-
}
247-
try {
248-
NativeConnectionWrapper pickedConnection = null;
249-
int attemptCount = 0;
250-
boolean queueInfoRefreshed = false;
251-
List<String> nodesWithAffinity = null;
252-
Management.QueueInfo info = affinityCache.queueInfo(affinity.queue());
253-
while (pickedConnection == null) {
254-
attemptCount++;
255-
NativeConnectionWrapper connectionWrapper = null;
256-
if (info == null) {
257-
connectionWrapper = connectionFactory.apply(null);
258-
management.init();
259-
info = management.queueInfo(affinity.queue());
260-
affinityCache.queueInfo(info);
261-
queueInfoRefreshed = true;
262-
}
263-
LOGGER.debug(
264-
"Looking affinity with queue '{}' (type = {}, leader = {}, replicas = {})",
265-
info.name(),
266-
info.type(),
267-
info.leader(),
268-
info.replicas());
269-
if (nodesWithAffinity == null) {
270-
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
271-
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
272-
}
273-
if (connectionWrapper == null) {
274-
List<Address> addressHints =
275-
nodesWithAffinity.stream()
276-
.map(affinityCache::nodenameToAddress)
277-
.filter(Objects::nonNull)
278-
.collect(Collectors.toList());
279-
connectionWrapper = connectionFactory.apply(addressHints);
280-
}
281-
LOGGER.debug("Currently connected to node {}", connectionWrapper.nodename);
282-
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
283-
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
284-
LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename);
285-
if (!queueInfoRefreshed) {
286-
LOGGER.debug("Refreshing queue information.");
287-
management.init();
288-
info = management.queueInfo(affinity.queue());
289-
affinityCache.queueInfo(info);
290-
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
291-
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
292-
queueInfoRefreshed = true;
293-
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
294-
pickedConnection = connectionWrapper;
295-
} else {
296-
management.releaseResources();
297-
connectionWrapper.connection.close();
298-
}
299-
} else {
300-
pickedConnection = connectionWrapper;
301-
}
302-
} else if (attemptCount == 5) {
303-
LOGGER.debug(
304-
"Could not find affinity {} after {} attempt(s), using last connection.",
305-
affinity,
306-
attemptCount);
307-
pickedConnection = connectionWrapper;
308-
} else {
309-
LOGGER.debug("Affinity {} not found with node {}.", affinity, connectionWrapper.nodename);
310-
if (!queueInfoRefreshed) {
311-
LOGGER.debug("Refreshing queue information.");
312-
management.init();
313-
info = management.queueInfo(affinity.queue());
314-
affinityCache.queueInfo(info);
315-
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
316-
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
317-
queueInfoRefreshed = true;
318-
}
319-
management.releaseResources();
320-
connectionWrapper.connection.close();
321-
}
322-
}
323-
return pickedConnection;
324-
} catch (RuntimeException e) {
325-
LOGGER.warn("Cannot enforce affinity {} of error when looking up queue", affinity, e);
326-
throw e;
327-
}
234+
private void sync(NativeConnectionWrapper wrapper) {
235+
this.connectionAddress = wrapper.address();
236+
this.connectionNodename = wrapper.nodename();
237+
this.nativeConnection = wrapper.connection();
328238
}
329239

330240
private static void checkBrokerVersion(org.apache.qpid.protonj2.client.Connection connection)
@@ -419,9 +329,7 @@ private void recoverAfterConnectionFailure(
419329
NativeConnectionWrapper ncw =
420330
recoverNativeConnection(
421331
recoveryConfiguration, connectionName, disconnectedHandlerReference);
422-
this.connectionAddress = ncw.address;
423-
this.connectionNodename = ncw.nodename;
424-
this.nativeConnection = ncw.connection;
332+
this.sync(ncw);
425333
LOGGER.debug("Reconnected to {}", this.currentConnectionLabel());
426334
} catch (Exception ex) {
427335
if (ex instanceof InterruptedException) {
@@ -476,15 +384,15 @@ private NativeConnectionWrapper recoverNativeConnection(
476384

477385
try {
478386
NativeConnectionWrapper result =
479-
enforceAffinity(
387+
ConnectionUtils.enforceAffinity(
480388
addrs -> {
481389
NativeConnectionWrapper wrapper =
482390
connect(
483391
this.connectionSettings,
484392
connectionName,
485393
disconnectedHandlerReference.get(),
486394
addrs);
487-
this.nativeConnection = wrapper.connection;
395+
this.nativeConnection = wrapper.connection();
488396
return wrapper;
489397
},
490398
this.management,
@@ -772,9 +680,13 @@ String nodename() {
772680
return this.nodename;
773681
}
774682

775-
public Address address() {
683+
Address address() {
776684
return this.address;
777685
}
686+
687+
org.apache.qpid.protonj2.client.Connection connection() {
688+
return this.connection;
689+
}
778690
}
779691

780692
@Override

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Objects;
2626
import java.util.concurrent.ConcurrentHashMap;
2727
import java.util.concurrent.ConcurrentMap;
28+
import java.util.function.Function;
2829
import java.util.stream.Collectors;
2930
import org.slf4j.Logger;
3031
import org.slf4j.LoggerFactory;
@@ -35,6 +36,103 @@ final class ConnectionUtils {
3536

3637
private ConnectionUtils() {}
3738

39+
static AmqpConnection.NativeConnectionWrapper enforceAffinity(
40+
Function<List<Address>, AmqpConnection.NativeConnectionWrapper> connectionFactory,
41+
AmqpManagement management,
42+
ConnectionAffinity affinity,
43+
AffinityCache affinityCache) {
44+
// TODO add retry for sensitive operations in affinity mechanism
45+
if (affinity == null) {
46+
return connectionFactory.apply(null);
47+
}
48+
try {
49+
AmqpConnection.NativeConnectionWrapper pickedConnection = null;
50+
int attemptCount = 0;
51+
boolean queueInfoRefreshed = false;
52+
List<String> nodesWithAffinity = null;
53+
Management.QueueInfo info = affinityCache.queueInfo(affinity.queue());
54+
while (pickedConnection == null) {
55+
attemptCount++;
56+
AmqpConnection.NativeConnectionWrapper connectionWrapper = null;
57+
if (info == null) {
58+
connectionWrapper = connectionFactory.apply(null);
59+
info = lookUpQueueInfo(management, affinity, affinityCache);
60+
queueInfoRefreshed = true;
61+
}
62+
LOGGER.debug(
63+
"Looking affinity with queue '{}' (type = {}, leader = {}, replicas = {})",
64+
info.name(),
65+
info.type(),
66+
info.leader(),
67+
info.replicas());
68+
if (nodesWithAffinity == null) {
69+
nodesWithAffinity = findAffinity(affinity, info);
70+
}
71+
if (connectionWrapper == null) {
72+
List<Address> addressHints =
73+
nodesWithAffinity.stream()
74+
.map(affinityCache::nodenameToAddress)
75+
.filter(Objects::nonNull)
76+
.collect(Collectors.toList());
77+
connectionWrapper = connectionFactory.apply(addressHints);
78+
}
79+
LOGGER.debug("Nodes matching affinity {}: {}.", affinity, nodesWithAffinity);
80+
LOGGER.debug("Currently connected to node {}.", connectionWrapper.nodename());
81+
affinityCache.nodenameToAddress(connectionWrapper.nodename(), connectionWrapper.address());
82+
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
83+
if (!queueInfoRefreshed) {
84+
info = lookUpQueueInfo(management, affinity, affinityCache);
85+
LOGGER.debug(
86+
"Found affinity, but refreshing queue information to check affinity is still valid.");
87+
nodesWithAffinity = findAffinity(affinity, info);
88+
queueInfoRefreshed = true;
89+
if (nodesWithAffinity.contains(connectionWrapper.nodename())) {
90+
pickedConnection = connectionWrapper;
91+
} else {
92+
LOGGER.debug("Affinity no longer valid, retrying.");
93+
management.releaseResources();
94+
connectionWrapper.connection().close();
95+
}
96+
} else {
97+
pickedConnection = connectionWrapper;
98+
}
99+
if (pickedConnection != null) {
100+
LOGGER.debug(
101+
"Affinity found with node {}, returning connection", pickedConnection.nodename());
102+
}
103+
} else if (attemptCount == 5) {
104+
LOGGER.debug(
105+
"Could not find affinity {} after {} attempt(s), using last connection.",
106+
affinity,
107+
attemptCount);
108+
pickedConnection = connectionWrapper;
109+
} else {
110+
LOGGER.debug(
111+
"Affinity {} not found with node {}.", affinity, connectionWrapper.nodename());
112+
if (!queueInfoRefreshed) {
113+
info = lookUpQueueInfo(management, affinity, affinityCache);
114+
nodesWithAffinity = findAffinity(affinity, info);
115+
queueInfoRefreshed = true;
116+
}
117+
management.releaseResources();
118+
connectionWrapper.connection().close();
119+
}
120+
}
121+
return pickedConnection;
122+
} catch (RuntimeException e) {
123+
LOGGER.warn("Cannot enforce affinity {} of error when looking up queue", affinity, e);
124+
throw e;
125+
}
126+
}
127+
128+
private static Management.QueueInfo lookUpQueueInfo(
129+
AmqpManagement management, ConnectionAffinity affinity, AffinityCache cache) {
130+
management.init();
131+
Management.QueueInfo info = management.queueInfo(affinity.queue());
132+
cache.queueInfo(info);
133+
return info;
134+
}
135+
38136
static class AffinityCache {
39137

40138
private final ConcurrentMap<String, Management.QueueInfo> queueInfoCache =

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
2121
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
22-
import static com.rabbitmq.client.amqp.impl.AmqpConnection.enforceAffinity;
22+
import static com.rabbitmq.client.amqp.impl.ConnectionUtils.enforceAffinity;
2323
import static org.assertj.core.api.Assertions.fail;
2424
import static org.mockito.ArgumentMatchers.anyList;
2525
import static org.mockito.Mockito.*;

0 commit comments

Comments
 (0)