Skip to content

Commit 607b60f

Browse files
committed
Test and fix the affinity algorithm
1 parent 835a63f commit 607b60f

File tree

3 files changed

+354
-190
lines changed

3 files changed

+354
-190
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,10 +257,18 @@ static NativeConnectionWrapper enforceAffinity(
257257
connectionWrapper = connectionFactory.apply(null);
258258
management.init();
259259
info = management.queueInfo(affinity.queue());
260+
affinityCache.queueInfo(info);
260261
queueInfoRefreshed = true;
261262
}
263+
LOGGER.debug(
264+
"Looking affinity with queue '{}' (type = {}, leader = {}, replicas = {})",
265+
info.name(),
266+
info.type(),
267+
info.leader(),
268+
info.replicas());
262269
if (nodesWithAffinity == null) {
263270
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
271+
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
264272
}
265273
if (connectionWrapper == null) {
266274
List<Address> addressHints =
@@ -269,28 +277,31 @@ static NativeConnectionWrapper enforceAffinity(
269277
.filter(Objects::nonNull)
270278
.collect(Collectors.toList());
271279
connectionWrapper = connectionFactory.apply(addressHints);
272-
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
273280
}
274281
LOGGER.debug("Currently connected to node {}", connectionWrapper.nodename);
282+
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
275283
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
276284
LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename);
277285
pickedConnection = connectionWrapper;
278286
} else if (attemptCount == 5) {
279287
LOGGER.debug(
280-
"Could not find affinity {} after {} attempt(s), using last connection",
288+
"Could not find affinity {} after {} attempt(s), using last connection.",
281289
affinity,
282290
attemptCount);
283291
pickedConnection = connectionWrapper;
284292
} else {
285-
LOGGER.debug("Affinity {} not found with node {}", affinity, connectionWrapper.nodename);
293+
LOGGER.debug("Affinity {} not found with node {}.", affinity, connectionWrapper.nodename);
286294
if (!queueInfoRefreshed) {
295+
LOGGER.debug("Refreshing queue information.");
287296
management.init();
288297
info = management.queueInfo(affinity.queue());
289298
affinityCache.queueInfo(info);
299+
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
300+
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
290301
queueInfoRefreshed = true;
291302
}
292-
connectionWrapper.connection.close();
293303
management.releaseResources();
304+
connectionWrapper.connection.close();
294305
}
295306
}
296307
return pickedConnection;

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityTest.java

Lines changed: 0 additions & 186 deletions
This file was deleted.

0 commit comments

Comments
 (0)