Skip to content

Commit f3275c4

Browse files
committed
Add test to consume from quorum queue while leader is paused
1 parent 8c344c8 commit f3275c4

File tree

2 files changed

+94
-1
lines changed

2 files changed

+94
-1
lines changed

src/test/java/com/rabbitmq/client/amqp/impl/Cli.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.ArrayList;
2828
import java.util.Arrays;
2929
import java.util.List;
30+
import java.util.Map;
3031
import java.util.concurrent.Callable;
3132
import java.util.function.Predicate;
3233
import java.util.regex.Matcher;
@@ -39,6 +40,11 @@ private Cli() {}
3940
private static final Pattern CONNECTION_NAME_PATTERN =
4041
Pattern.compile("\"connection_name\",\"(?<name>[a-zA-Z0-9_\\-]+)?\"");
4142
private static final String DOCKER_PREFIX = "DOCKER:";
43+
private static final Map<String, String> DOCKER_NODES_TO_CONTAINERS =
44+
Map.of(
45+
"rabbit@node0", "rabbitmq0",
46+
"rabbit@node1", "rabbitmq1",
47+
"rabbit@node2", "rabbitmq3");
4248

4349
public static String rabbitmqctlCommand() {
4450
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
@@ -223,6 +229,18 @@ static void restartStream(String stream) {
223229
rabbitmqStreams(" restart_stream " + stream);
224230
}
225231

232+
static void pauseNode(String node) {
233+
String containerId = DOCKER_NODES_TO_CONTAINERS.get(node);
234+
Assert.notNull(containerId, "No container for node " + node);
235+
executeCommand("docker pause " + containerId);
236+
}
237+
238+
static void unpauseNode(String node) {
239+
String containerId = DOCKER_NODES_TO_CONTAINERS.get(node);
240+
Assert.notNull(containerId, "No container for node " + node);
241+
executeCommand("docker unpause " + containerId);
242+
}
243+
226244
static List<ConnectionInfo> listConnections() {
227245
String output = rabbitmqctl("list_connections -q pid peer_port client_properties").output();
228246
// output (header line presence depends on broker version):

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ConcurrentHashMap;
3333
import java.util.concurrent.atomic.AtomicInteger;
3434
import java.util.function.Consumer;
35+
import java.util.stream.Collectors;
3536
import org.junit.jupiter.api.*;
3637
import org.junit.jupiter.params.ParameterizedTest;
3738
import org.junit.jupiter.params.provider.EnumSource;
@@ -213,7 +214,7 @@ void consumeFromMovingQq() {
213214
}
214215

215216
@Test
216-
void publishConsumeQqWhenLeaderChanges() {
217+
void publishConsumeQuorumQueueWhenLeaderChanges() {
217218
try {
218219
management.queue(q).type(Management.QueueType.QUORUM).declare();
219220

@@ -271,6 +272,80 @@ void publishConsumeQqWhenLeaderChanges() {
271272
}
272273
}
273274

275+
@Test
276+
void consumeFromQuorumQueueWhenLeaderIsPaused() {
277+
management.queue(q).type(Management.QueueType.QUORUM).declare();
278+
Management.QueueInfo queueInfo = queueInfo();
279+
String initialLeader = queueInfo.leader();
280+
boolean nodePaused = false;
281+
try {
282+
AmqpConnection consumeConnection = connection(b -> b.affinity().queue(q).operation(CONSUME));
283+
assertThat(consumeConnection).isOnFollower(queueInfo());
284+
Management mgmt = consumeConnection.management();
285+
286+
Set<Long> messageIds = ConcurrentHashMap.newKeySet();
287+
Sync consumeSync = sync();
288+
consumeConnection
289+
.consumerBuilder()
290+
.queue(q)
291+
.messageHandler(
292+
(ctx, msg) -> {
293+
messageIds.add(msg.messageIdAsLong());
294+
consumeSync.down();
295+
ctx.accept();
296+
})
297+
.build();
298+
299+
AmqpConnection publishConnection = connection(b -> b.affinity().queue(q).operation(CONSUME));
300+
Publisher publisher = publishConnection.publisherBuilder().queue(q).build();
301+
assertThat(publishConnection).isOnFollower(queueInfo());
302+
303+
Sync publishSync = sync();
304+
publisher.publish(publisher.message().messageId(1L), ctx -> publishSync.down());
305+
assertThat(publishSync).completes();
306+
publishSync.reset();
307+
308+
assertThat(consumeSync).completes();
309+
assertThat(messageIds).containsExactlyInAnyOrder(1L);
310+
consumeSync.reset();
311+
312+
List<String> initialFollowers =
313+
queueInfo.replicas().stream()
314+
.filter(n -> !n.equals(initialLeader))
315+
.collect(Collectors.toList());
316+
assertThat(initialFollowers).isNotEmpty();
317+
318+
Cli.pauseNode(initialLeader);
319+
nodePaused = true;
320+
321+
publisher.publish(publisher.message().messageId(2L), ctx -> publishSync.down());
322+
assertThat(publishSync).completes();
323+
publishSync.reset();
324+
325+
assertThat(consumeSync).completes();
326+
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L);
327+
consumeSync.reset();
328+
329+
Cli.unpauseNode(initialLeader);
330+
nodePaused = false;
331+
332+
publisher.publish(publisher.message().messageId(3L), ctx -> publishSync.down());
333+
assertThat(publishSync).completes();
334+
publishSync.reset();
335+
336+
assertThat(consumeSync).completes();
337+
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L, 3L);
338+
consumeSync.reset();
339+
340+
waitAtMost(() -> initialFollowers.contains(mgmt.queueInfo(q).leader()));
341+
} finally {
342+
if (nodePaused) {
343+
Cli.unpauseNode(initialLeader);
344+
}
345+
management.queueDeletion().delete(q);
346+
}
347+
}
348+
274349
@Test
275350
void publishToRestartedStream() {
276351
try {

0 commit comments

Comments
 (0)