Skip to content

Commit c768f89

Browse files
committed
Enforce affinity during recovery
1 parent 607b60f commit c768f89

File tree

7 files changed

+179
-23
lines changed

7 files changed

+179
-23
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,23 @@ static NativeConnectionWrapper enforceAffinity(
282282
affinityCache.nodenameToAddress(connectionWrapper.nodename, connectionWrapper.address);
283283
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
284284
LOGGER.debug("Affinity {} found with node {}", affinity, connectionWrapper.nodename);
285-
pickedConnection = connectionWrapper;
285+
if (!queueInfoRefreshed) {
286+
LOGGER.debug("Refreshing queue information.");
287+
management.init();
288+
info = management.queueInfo(affinity.queue());
289+
affinityCache.queueInfo(info);
290+
nodesWithAffinity = ConnectionUtils.findAffinity(affinity, info);
291+
LOGGER.debug("Nodes matching affinity {}: {}", affinity, nodesWithAffinity);
292+
queueInfoRefreshed = true;
293+
if (nodesWithAffinity.contains(connectionWrapper.nodename)) {
294+
pickedConnection = connectionWrapper;
295+
} else {
296+
management.releaseResources();
297+
connectionWrapper.connection.close();
298+
}
299+
} else {
300+
pickedConnection = connectionWrapper;
301+
}
286302
} else if (attemptCount == 5) {
287303
LOGGER.debug(
288304
"Could not find affinity {} after {} attempt(s), using last connection.",
@@ -389,7 +405,8 @@ private void recoverAfterConnectionFailure(
389405
AmqpException failureCause,
390406
AtomicReference<BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>>
391407
disconnectedHandlerReference) {
392-
LOGGER.info("Connection to {} failed, trying to recover", this.currentConnectionLabel());
408+
LOGGER.info(
409+
"Connection to {} has been disconnected, trying to recover", this.currentConnectionLabel());
393410
this.state(RECOVERING, failureCause);
394411
this.changeStateOfPublishers(RECOVERING, failureCause);
395412
this.changeStateOfConsumers(RECOVERING, failureCause);
@@ -399,9 +416,13 @@ private void recoverAfterConnectionFailure(
399416
this.releaseManagementResources();
400417
try {
401418
this.recoveringConnection.set(true);
402-
this.nativeConnection =
419+
NativeConnectionWrapper ncw =
403420
recoverNativeConnection(
404421
recoveryConfiguration, connectionName, disconnectedHandlerReference);
422+
this.connectionAddress = ncw.address;
423+
this.connectionNodename = ncw.nodename;
424+
this.nativeConnection = ncw.connection;
425+
LOGGER.debug("Reconnected to {}", this.currentConnectionLabel());
405426
} catch (Exception ex) {
406427
if (ex instanceof InterruptedException) {
407428
Thread.currentThread().interrupt();
@@ -432,7 +453,7 @@ private void recoverAfterConnectionFailure(
432453
}
433454
}
434455

435-
private org.apache.qpid.protonj2.client.Connection recoverNativeConnection(
456+
private NativeConnectionWrapper recoverNativeConnection(
436457
AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration,
437458
String connectionName,
438459
AtomicReference<BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>>
@@ -455,11 +476,21 @@ private org.apache.qpid.protonj2.client.Connection recoverNativeConnection(
455476

456477
try {
457478
NativeConnectionWrapper result =
458-
connect(
459-
this.connectionSettings, connectionName, disconnectedHandlerReference.get(), null);
460-
this.connectionAddress = result.address;
461-
LOGGER.debug("Reconnected to {}", this.currentConnectionLabel());
462-
return result.connection;
479+
enforceAffinity(
480+
addrs -> {
481+
NativeConnectionWrapper wrapper =
482+
connect(
483+
this.connectionSettings,
484+
connectionName,
485+
disconnectedHandlerReference.get(),
486+
addrs);
487+
this.nativeConnection = wrapper.connection;
488+
return wrapper;
489+
},
490+
this.management,
491+
this.affinity,
492+
this.environment.affinityCache());
493+
return result;
463494
} catch (Exception ex) {
464495
LOGGER.info("Error while trying to recover connection", ex);
465496
if (!RECOVERY_PREDICATE.test(ex)) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ void init() {
255255
}
256256
} catch (ClientException e) {
257257
java.util.function.Consumer<String> log =
258-
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.warn(m, e);
258+
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.info(m, e);
259259
log.accept("Error while polling AMQP receiver");
260260
}
261261
};

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionAffinityUnitTest.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
2021
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
2122
import static com.rabbitmq.client.amqp.impl.AmqpConnection.enforceAffinity;
2223
import static org.assertj.core.api.Assertions.fail;
@@ -48,6 +49,11 @@ public class AmqpConnectionAffinityUnitTest {
4849
private static final String FOLLOWER2_NODENAME = "f2";
4950
private static final Address FOLLOWER2_ADDRESS = new Address(FOLLOWER2_NODENAME, 5672);
5051
private static final String Q = "my-queue";
52+
private static final Map<String, Address> NODES =
53+
Map.of(
54+
LEADER_NODENAME, LEADER_ADDRESS,
55+
FOLLOWER1_NODENAME, FOLLOWER1_ADDRESS,
56+
FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS);
5157

5258
AutoCloseable mocks;
5359

@@ -71,17 +77,39 @@ void tearDown() throws Exception {
7177
}
7278

7379
@Test
74-
void noInfoLookupIfAlreadyInCache() {
80+
void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldUseConnectionIfMatch() {
7581
cache.queueInfo(info());
82+
when(management.queueInfo(Q)).thenReturn(info());
7683
when(cf.apply(anyList())).thenReturn(leaderConnection());
7784
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
7885
assertThat(w).isLeader();
79-
verifyNoInteractions(management);
86+
verify(management, times(1)).queueInfo(Q);
8087
verify(cf, times(1)).apply(anyList());
8188
verify(nativeConnection, never()).close();
8289
assertThat(cache).contains(info()).hasMapping(LEADER_NODENAME, LEADER_ADDRESS);
8390
}
8491

92+
@Test
93+
void infoInCache_ShouldLookUpInfoAndCheckIt_ShouldRetryIfConnectionDoesNotMatch() {
94+
String initialLeader = LEADER_NODENAME;
95+
when(management.queueInfo(Q)).thenReturn(info(initialLeader));
96+
when(cf.apply(anyList())).thenReturn(leaderConnection());
97+
AmqpConnection.NativeConnectionWrapper w = enforceAffinity(cf, management, affinity(), cache);
98+
assertThat(w).hasNodename(initialLeader);
99+
100+
String newLeader = FOLLOWER1_NODENAME;
101+
// the QQ leader moves to another node for some reason
102+
// the cache is stale, the management is the authority
103+
when(management.queueInfo(Q)).thenReturn(info(newLeader));
104+
when(cf.apply(anyList())).thenReturn(leaderConnection()).thenReturn(follower1Connection());
105+
// we want the returned connection to be on the new leader
106+
w = enforceAffinity(cf, management, affinity(), cache);
107+
assertThat(w).hasNodename(newLeader);
108+
verify(management, times(2)).queueInfo(Q);
109+
verify(cf, times(3)).apply(anyList());
110+
verify(nativeConnection, times(1)).close();
111+
}
112+
85113
@Test
86114
void infoLookupIfNotInCache() {
87115
when(cf.apply(anyList())).thenReturn(leaderConnection());
@@ -145,10 +173,20 @@ AmqpConnection.NativeConnectionWrapper follower2Connection() {
145173
this.nativeConnection, FOLLOWER2_NODENAME, FOLLOWER2_ADDRESS);
146174
}
147175

176+
AmqpConnection.NativeConnectionWrapper connection(String nodename) {
177+
return new AmqpConnection.NativeConnectionWrapper(
178+
this.nativeConnection, nodename, NODES.get(nodename));
179+
}
180+
148181
static ConnectionUtils.ConnectionAffinity affinity() {
149182
return new ConnectionUtils.ConnectionAffinity(Q, ConnectionSettings.Affinity.Operation.PUBLISH);
150183
}
151184

185+
static Management.QueueInfo info(String leader) {
186+
return new TestQueueInfo(
187+
Q, QUORUM, leader, List.of(LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME));
188+
}
189+
152190
static Management.QueueInfo info() {
153191
return info(
154192
Management.QueueType.QUORUM, LEADER_NODENAME, FOLLOWER1_NODENAME, FOLLOWER2_NODENAME);

src/test/java/com/rabbitmq/client/amqp/impl/Cli.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,23 @@ public static String rabbitmqctlCommand() {
5353
}
5454
}
5555

56+
static String rabbitmqQueuesCommand() {
57+
String rabbitmqctl = rabbitmqctlCommand();
58+
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
59+
if (lastIndex == -1) {
60+
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
61+
}
62+
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-queues";
63+
}
64+
5665
static ProcessState rabbitmqctl(String command) {
5766
return executeCommand(rabbitmqctlCommand() + " " + command);
5867
}
5968

69+
static ProcessState rabbitmqQueues(String command) {
70+
return executeCommand(rabbitmqQueuesCommand() + " " + command);
71+
}
72+
6073
static ProcessState rabbitmqctlIgnoreError(String command) {
6174
return executeCommand(rabbitmqctlCommand() + " " + command, true);
6275
}
@@ -177,6 +190,14 @@ static boolean exchangeExists(String exchange) {
177190
return Arrays.asList(output.split("\n")).contains(exchange);
178191
}
179192

193+
static void addQuorumQueueMember(String queue, String node) {
194+
rabbitmqQueues(" add_member " + queue + " " + node);
195+
}
196+
197+
static void deleteQuorumQueueMember(String queue, String node) {
198+
rabbitmqQueues(" delete_member " + queue + " " + node);
199+
}
200+
180201
static List<ConnectionInfo> listConnections() {
181202
String output = rabbitmqctl("list_connections -q pid peer_port client_properties").output();
182203
// output (header line presence depends on broker version):

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 58 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.CONSUME;
2121
import static com.rabbitmq.client.amqp.ConnectionSettings.Affinity.Operation.PUBLISH;
22+
import static java.time.Duration.ofMillis;
2223
import static org.assertj.core.api.Assertions.assertThat;
2324

2425
import com.rabbitmq.client.amqp.*;
@@ -32,13 +33,15 @@
3233
@TestUtils.DisabledIfNotCluster
3334
public class ClusterTest {
3435

36+
static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy.fixed(ofMillis(100));
3537
Environment environment;
3638
Connection connection;
3739
Management management;
38-
String name;
40+
String q, name;
3941

4042
@BeforeEach
4143
void init(TestInfo info) {
44+
this.q = TestUtils.name(info);
4245
this.name = TestUtils.name(info);
4346
environment =
4447
new AmqpEnvironmentBuilder()
@@ -60,24 +63,69 @@ void tearDown() {
6063
@ParameterizedTest
6164
void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) {
6265
try {
63-
management.queue(name).type(type).declare();
64-
AmqpConnection consumeConnection =
65-
connection(b -> b.affinity().queue(name).operation(CONSUME));
66-
AmqpConnection publishConnection =
67-
connection(b -> b.affinity().queue(name).operation(PUBLISH));
68-
Management.QueueInfo info = connection.management().queueInfo(name);
66+
management.queue(q).type(type).declare();
67+
AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME));
68+
AmqpConnection publishConnection = connection(b -> b.affinity().queue(q).operation(PUBLISH));
69+
Management.QueueInfo info = connection.management().queueInfo(q);
6970
assertThat(publishConnection.connectionNodename()).isEqualTo(info.leader());
7071
assertThat(consumeConnection.connectionNodename())
7172
.isIn(info.replicas())
7273
.isNotEqualTo(info.leader());
7374
assertThat(Cli.listConnections()).hasSize(3);
7475
} finally {
75-
management.queueDeletion().delete(name);
76+
management.queueDeletion().delete(q);
77+
}
78+
}
79+
80+
@Test
81+
void connectionShouldRecoverToNewQuorumQueueLeaderAfterAfterItHasMoved() {
82+
try {
83+
management.queue(q).type(Management.QueueType.QUORUM).declare();
84+
Management.QueueInfo info = queueInfo();
85+
String initialLeader = info.leader();
86+
87+
TestUtils.Sync recoveredSync = TestUtils.sync();
88+
AmqpConnection publishConnection =
89+
connection(
90+
b ->
91+
b.name(name)
92+
.listeners(
93+
context -> {
94+
if (context.previousState() == Resource.State.RECOVERING
95+
&& context.currentState() == Resource.State.OPEN) {
96+
recoveredSync.down();
97+
}
98+
})
99+
.affinity()
100+
.queue(q)
101+
.operation(PUBLISH));
102+
assertThat(publishConnection.connectionNodename()).isEqualTo(initialLeader);
103+
104+
int initialReplicaCount = info.replicas().size();
105+
Cli.deleteQuorumQueueMember(q, initialLeader);
106+
TestUtils.waitAtMost(() -> !queueInfo().leader().equals(initialLeader));
107+
assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1);
108+
Cli.addQuorumQueueMember(q, initialLeader);
109+
TestUtils.waitAtMost(() -> queueInfo().replicas().size() == initialReplicaCount);
110+
info = queueInfo();
111+
TestUtils.assertThat(info).doesNotHaveLeader(initialLeader);
112+
String newLeader = info.leader();
113+
114+
Cli.closeConnection(name);
115+
TestUtils.assertThat(recoveredSync).completes();
116+
assertThat(publishConnection.connectionNodename()).isEqualTo(newLeader);
117+
} finally {
118+
management.queueDeletion().delete(q);
76119
}
77120
}
78121

79-
AmqpConnection connection(Consumer<ConnectionBuilder> operation) {
80-
ConnectionBuilder builder = environment.connectionBuilder();
122+
Management.QueueInfo queueInfo() {
123+
return this.management.queueInfo(q);
124+
}
125+
126+
AmqpConnection connection(Consumer<AmqpConnectionBuilder> operation) {
127+
AmqpConnectionBuilder builder = (AmqpConnectionBuilder) environment.connectionBuilder();
128+
builder.recovery().backOffDelayPolicy(BACK_OFF_DELAY_POLICY);
81129
operation.accept(builder);
82130
return (AmqpConnection) builder.build();
83131
}

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,24 @@ QueueInfoAssert is(Management.QueueType type) {
589589
return this;
590590
}
591591

592+
QueueInfoAssert hasLeader(String leader) {
593+
Assert.notNull(leader, "Expected leader cannot be null");
594+
isNotNull();
595+
if (!leader.equals(actual.leader())) {
596+
fail("Queue leader should be '%s' but is '%s'", leader, actual.leader());
597+
}
598+
return this;
599+
}
600+
601+
QueueInfoAssert doesNotHaveLeader(String leader) {
602+
Assert.notNull(leader, "Leader cannot be null");
603+
isNotNull();
604+
if (leader.equals(actual.leader())) {
605+
fail("Queue leader should not be '%s'", leader);
606+
}
607+
return this;
608+
}
609+
592610
QueueInfoAssert hasArgument(String key, Object value) {
593611
isNotNull();
594612
if (!actual.arguments().containsKey(key) || !actual.arguments().get(key).equals(value)) {

src/test/resources/logback-test.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
</encoder>
66
</appender>
77

8-
<logger name="com.rabbitmq.client.amqp.AmqpClientTestExtension" level="info" />
8+
<logger name="com.rabbitmq.client.amqp.AmqpClientTestExtension" level="warn" />
99
<logger name="com.rabbitmq.client.amqp" level="warn" />
1010
<logger name="com.rabbitmq.client.amqp.impl.EntityRecovery" level="warn" />
1111
<logger name="com.rabbitmq.client.amqp.impl.AmqpConnection" level="warn" />

0 commit comments

Comments
 (0)