Skip to content

Commit c93c80f

Browse files
committed
Add cluster restart test with quorum queues
1 parent 297addb commit c93c80f

File tree

8 files changed

+334
-50
lines changed

8 files changed

+334
-50
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,13 @@
3333

3434
final class ConnectionUtils {
3535

36-
static final ConnectionSettings.AffinityStrategy PREFER_LEADER_FOR_PUBLISHING_STRATEGY =
37-
new PreferLeaderForPublishingAffinityStrategy();
36+
static final ConnectionSettings.AffinityStrategy
37+
LEADER_FOR_PUBLISHING_FOLLOWERS_FOR_CONSUMING_STRATEGY =
38+
new LeaderForPublishingFollowersForConsumingStrategy();
3839

39-
static final ConnectionSettings.AffinityStrategy MEMBER_AFFINITY_STRATEGY =
40-
new MemberAffinityStrategy();
40+
static final ConnectionSettings.AffinityStrategy
41+
LEADER_FOR_PUBLISHING_MEMBERS_FOR_CONSUMING_STRATEGY =
42+
new LeaderForPublishingMembersForConsumingStrategy();
4143

4244
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionUtils.class);
4345

@@ -229,18 +231,28 @@ public int hashCode() {
229231
}
230232
}
231233

232-
static class MemberAffinityStrategy implements ConnectionSettings.AffinityStrategy {
234+
static class LeaderForPublishingMembersForConsumingStrategy
235+
implements ConnectionSettings.AffinityStrategy {
233236

234237
@Override
235238
public List<String> nodesWithAffinity(
236239
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
237-
return (info.replicas() == null || info.replicas().isEmpty())
238-
? Collections.emptyList()
239-
: List.copyOf(info.replicas());
240+
List<String> nodesWithAffinity =
241+
(info.replicas() == null || info.replicas().isEmpty())
242+
? Collections.emptyList()
243+
: List.copyOf(info.replicas());
244+
if (context.operation() == ConnectionSettings.Affinity.Operation.PUBLISH) {
245+
if (info.leader() != null && !info.leader().isBlank()) {
246+
nodesWithAffinity = List.of(info.leader());
247+
} else {
248+
nodesWithAffinity = Collections.emptyList();
249+
}
250+
}
251+
return nodesWithAffinity;
240252
}
241253
}
242254

243-
static class PreferLeaderForPublishingAffinityStrategy
255+
static class LeaderForPublishingFollowersForConsumingStrategy
244256
implements ConnectionSettings.AffinityStrategy {
245257

246258
@Override

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,8 @@ static class DefaultAffinity<T> implements Affinity<T> {
413413
private String queue;
414414
private Operation operation;
415415
private boolean reuse = false;
416-
private AffinityStrategy strategy = ConnectionUtils.MEMBER_AFFINITY_STRATEGY;
416+
private AffinityStrategy strategy =
417+
ConnectionUtils.LEADER_FOR_PUBLISHING_MEMBERS_FOR_CONSUMING_STRATEGY;
417418

418419
DefaultAffinity(DefaultConnectionSettings<T> connectionSettings) {
419420
this.connectionSettings = connectionSettings;

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,6 @@ private static AmqpConnection.NativeConnectionWrapper enforceAffinity(
443443
management,
444444
context,
445445
affinityCache,
446-
ConnectionUtils.PREFER_LEADER_FOR_PUBLISHING_STRATEGY);
446+
ConnectionUtils.LEADER_FOR_PUBLISHING_FOLLOWERS_FOR_CONSUMING_STRATEGY);
447447
}
448448
}

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,9 @@ SyncAssert completes() {
8888
}
8989

9090
SyncAssert completes(Duration timeout) {
91-
try {
92-
boolean completed = actual.await(timeout);
93-
if (!completed) {
94-
fail("Sync timed out after %d ms", timeout.toMillis());
95-
}
96-
} catch (InterruptedException e) {
97-
Thread.interrupted();
98-
throw new RuntimeException(e);
91+
boolean completed = actual.await(timeout);
92+
if (!completed) {
93+
fail("Sync timed out after %d ms", timeout.toMillis());
9994
}
10095
return this;
10196
}

src/test/java/com/rabbitmq/client/amqp/impl/Cli.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.function.Predicate;
3333
import java.util.regex.Matcher;
3434
import java.util.regex.Pattern;
35+
import java.util.stream.Collectors;
3536

3637
abstract class Cli {
3738

@@ -273,6 +274,26 @@ static void restartNode(String node) {
273274
executeCommand(dockerCommand + "rabbitmqctl status");
274275
}
275276

277+
static void rebalance() {
278+
rabbitmqQueues("rebalance all");
279+
}
280+
281+
static List<String> nodes() {
282+
List<String> clusterNodes = new ArrayList<>();
283+
clusterNodes.add(rabbitmqctl("eval 'node().'").output().trim());
284+
List<String> nodes =
285+
Arrays.stream(
286+
rabbitmqctl("eval 'nodes().'")
287+
.output()
288+
.replace("[", "")
289+
.replace("]", "")
290+
.split(","))
291+
.map(String::trim)
292+
.collect(Collectors.toList());
293+
clusterNodes.addAll(nodes);
294+
return List.copyOf(clusterNodes);
295+
}
296+
276297
private static String nodeToDockerContainer(String node) {
277298
String containerId = DOCKER_NODES_TO_CONTAINERS.get(node);
278299
Assert.notNull(containerId, "No container for node " + node);

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,9 @@ void connectionShouldBeOnOwningNodeWhenAffinityIsActivatedForClassicQueues(TestI
447447
b ->
448448
b.uri(uri)
449449
.affinity()
450-
.strategy(ConnectionUtils.MEMBER_AFFINITY_STRATEGY)))
450+
.strategy(
451+
ConnectionUtils
452+
.LEADER_FOR_PUBLISHING_MEMBERS_FOR_CONSUMING_STRATEGY)))
451453
.collect(toList());
452454
List<Management.QueueInfo> queueInfos =
453455
range(0, URIS.length)
@@ -541,7 +543,7 @@ AmqpConnection connection(Consumer<AmqpConnectionBuilder> operation) {
541543
.backOffDelayPolicy(BACK_OFF_DELAY_POLICY)
542544
.connectionBuilder()
543545
.affinity()
544-
.strategy(ConnectionUtils.PREFER_LEADER_FOR_PUBLISHING_STRATEGY);
546+
.strategy(ConnectionUtils.LEADER_FOR_PUBLISHING_FOLLOWERS_FOR_CONSUMING_STRATEGY);
545547
operation.accept(builder);
546548
return (AmqpConnection) builder.build();
547549
}

0 commit comments

Comments
 (0)