Skip to content

Commit 8bf458f

Browse files
committed
Test classic queue affinity across a cluster
1 parent 2052ca5 commit 8bf458f

File tree

3 files changed

+73
-10
lines changed

3 files changed

+73
-10
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/ConnectionUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ final class ConnectionUtils {
3636
static final ConnectionSettings.AffinityStrategy PREFER_LEADER_FOR_PUBLISHING_STRATEGY =
3737
new PreferLeaderForPublishingAffinityStrategy();
3838

39+
static final ConnectionSettings.AffinityStrategy MEMBER_AFFINITY_STRATEGY =
40+
new MemberAffinityStrategy();
41+
3942
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionUtils.class);
4043

4144
private ConnectionUtils() {}
@@ -226,6 +229,17 @@ public int hashCode() {
226229
}
227230
}
228231

232+
static class MemberAffinityStrategy implements ConnectionSettings.AffinityStrategy {
233+
234+
@Override
235+
public List<String> nodesWithAffinity(
236+
ConnectionSettings.AffinityContext context, Management.QueueInfo info) {
237+
return (info.replicas() == null || info.replicas().isEmpty())
238+
? Collections.emptyList()
239+
: List.copyOf(info.replicas());
240+
}
241+
}
242+
229243
static class PreferLeaderForPublishingAffinityStrategy
230244
implements ConnectionSettings.AffinityStrategy {
231245

src/main/java/com/rabbitmq/client/amqp/impl/DefaultConnectionSettings.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,7 +413,7 @@ static class DefaultAffinity<T> implements Affinity<T> {
413413
private String queue;
414414
private Operation operation;
415415
private boolean reuse = false;
416-
private AffinityStrategy strategy = ConnectionUtils.PREFER_LEADER_FOR_PUBLISHING_STRATEGY;
416+
private AffinityStrategy strategy = ConnectionUtils.MEMBER_AFFINITY_STRATEGY;
417417

418418
DefaultAffinity(DefaultConnectionSettings<T> connectionSettings) {
419419
this.connectionSettings = connectionSettings;

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.CONSUME;
2121
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.PUBLISH;
2222
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
23-
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
24-
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
23+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2524
import static java.time.Duration.ofMillis;
2625
import static java.time.Duration.ofSeconds;
26+
import static java.util.stream.Collectors.toList;
27+
import static java.util.stream.IntStream.range;
2728
import static org.assertj.core.api.Assertions.assertThat;
2829

2930
import com.rabbitmq.client.amqp.*;
3031
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
32+
import java.util.Arrays;
3133
import java.util.List;
3234
import java.util.Set;
3335
import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +43,8 @@
4143
@TestUtils.DisabledIfNotCluster
4244
public class ClusterTest {
4345

46+
static final String[] URIS =
47+
new String[] {"amqp://localhost:5672", "amqp://localhost:5673", "amqp://localhost:5674"};
4448
static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy.fixed(ofMillis(100));
4549
Environment environment;
4650
Connection connection;
@@ -49,13 +53,13 @@ public class ClusterTest {
4953

5054
@BeforeEach
5155
void init(TestInfo info) {
52-
this.q = TestUtils.name(info);
53-
this.name = TestUtils.name(info);
56+
this.q = name(info);
57+
this.name = name(info);
5458
environment =
5559
new AmqpEnvironmentBuilder()
5660
.connectionSettings()
5761
.addressSelector(new RoundRobinAddressSelector())
58-
.uris("amqp://localhost:5672", "amqp://localhost:5673", "amqp://localhost:5674")
62+
.uris(URIS)
5963
.environmentBuilder()
6064
.build();
6165
this.connection = environment.connectionBuilder().build();
@@ -311,9 +315,7 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
311315
consumeSync.reset();
312316

313317
List<String> initialFollowers =
314-
queueInfo.replicas().stream()
315-
.filter(n -> !n.equals(initialLeader))
316-
.collect(Collectors.toList());
318+
queueInfo.replicas().stream().filter(n -> !n.equals(initialLeader)).collect(toList());
317319
assertThat(initialFollowers).isNotEmpty();
318320

319321
Cli.pauseNode(initialLeader);
@@ -433,6 +435,48 @@ void consumeFromRestartedStream() {
433435
}
434436
}
435437

438+
@Test
439+
void connectionShouldBeOnOwningNodeWhenAffinityIsActivatedForClassicQueues(TestInfo info) {
440+
List<String> names = range(0, URIS.length).mapToObj(ignored -> name(info)).collect(toList());
441+
try {
442+
List<Connection> connections =
443+
Arrays.stream(URIS)
444+
.map(
445+
uri ->
446+
connection(
447+
b ->
448+
b.uri(uri)
449+
.affinity()
450+
.strategy(ConnectionUtils.MEMBER_AFFINITY_STRATEGY)))
451+
.collect(toList());
452+
List<Management.QueueInfo> queueInfos =
453+
range(0, URIS.length)
454+
.mapToObj(
455+
i ->
456+
connections
457+
.get(i)
458+
.management()
459+
.queue(names.get(i))
460+
.type(Management.QueueType.CLASSIC)
461+
.declare())
462+
.collect(toList());
463+
assertThat(queueInfos.stream().map(Management.QueueInfo::leader).collect(Collectors.toSet()))
464+
.hasSameSizeAs(URIS);
465+
466+
List<AmqpConnection> connectionsWithAffinity =
467+
names.stream().map(n -> connection(b -> b.affinity().queue(n))).collect(toList());
468+
469+
range(0, URIS.length)
470+
.forEach(
471+
i ->
472+
assertThat(connectionsWithAffinity.get(i))
473+
.hasNodename(queueInfos.get(i).leader()));
474+
475+
} finally {
476+
names.forEach(n -> management.queueDeletion().delete(n));
477+
}
478+
}
479+
436480
String moveQqLeader() {
437481
String initialLeader = deleteQqLeader();
438482
addQqMember(initialLeader);
@@ -492,7 +536,12 @@ Management.QueueInfo queueInfo() {
492536

493537
AmqpConnection connection(Consumer<AmqpConnectionBuilder> operation) {
494538
AmqpConnectionBuilder builder = (AmqpConnectionBuilder) environment.connectionBuilder();
495-
builder.recovery().backOffDelayPolicy(BACK_OFF_DELAY_POLICY);
539+
builder
540+
.recovery()
541+
.backOffDelayPolicy(BACK_OFF_DELAY_POLICY)
542+
.connectionBuilder()
543+
.affinity()
544+
.strategy(ConnectionUtils.PREFER_LEADER_FOR_PUBLISHING_STRATEGY);
496545
operation.accept(builder);
497546
return (AmqpConnection) builder.build();
498547
}

0 commit comments

Comments
 (0)