Skip to content

Commit 297addb

Browse files
committed
Add simple test with rolling cluster restart
1 parent 8bf458f commit 297addb

File tree

5 files changed

+128
-7
lines changed

5 files changed

+128
-7
lines changed

.github/workflows/test-pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,6 @@ jobs:
4141
- name: Start cluster
4242
run: ci/start-cluster.sh
4343
- name: Test against cluster
44-
run: ./mvnw test -Dtest=ClusterTest -Drabbitmqctl.bin=DOCKER:rabbitmq0
44+
run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
4545
- name: Stop cluster
4646
run: docker compose --file ci/cluster/docker-compose.yml down

.github/workflows/test-supported-java-versions.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,6 @@ jobs:
4949
- name: Start cluster
5050
run: ci/start-cluster.sh
5151
- name: Test against cluster
52-
run: ./mvnw test -Dtest=ClusterTest -Drabbitmqctl.bin=DOCKER:rabbitmq0
52+
run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
5353
- name: Stop cluster
5454
run: docker compose --file ci/cluster/docker-compose.yml down

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ jobs:
4747
- name: Start cluster
4848
run: ci/start-cluster.sh
4949
- name: Test against cluster
50-
run: ./mvnw test -Dtest=ClusterTest -Drabbitmqctl.bin=DOCKER:rabbitmq0
50+
run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
5151
- name: Stop cluster
5252
run: docker compose --file ci/cluster/docker-compose.yml down
5353
- name: Publish snapshot

src/test/java/com/rabbitmq/client/amqp/impl/Cli.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ private Cli() {}
4444
Map.of(
4545
"rabbit@node0", "rabbitmq0",
4646
"rabbit@node1", "rabbitmq1",
47-
"rabbit@node2", "rabbitmq3");
47+
"rabbit@node2", "rabbitmq2");
4848

4949
public static String rabbitmqctlCommand() {
5050
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
5151
if (rabbitmqCtl == null) {
52-
throw new IllegalStateException("Please define the rabbitmqctl.bin system property");
52+
rabbitmqCtl = "DOCKER:rabbitmq";
5353
}
5454
if (rabbitmqCtl.startsWith(DOCKER_PREFIX)) {
5555
String containerId = rabbitmqCtl.split(":")[1];
@@ -59,7 +59,7 @@ public static String rabbitmqctlCommand() {
5959
}
6060
}
6161

62-
static String rabbitmqQueuesCommand() {
62+
private static String rabbitmqQueuesCommand() {
6363
String rabbitmqctl = rabbitmqctlCommand();
6464
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
6565
if (lastIndex == -1) {
@@ -68,6 +68,15 @@ static String rabbitmqQueuesCommand() {
6868
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-queues";
6969
}
7070

71+
private static String rabbitmqUpgradeCommand() {
72+
String rabbitmqctl = rabbitmqctlCommand();
73+
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
74+
if (lastIndex == -1) {
75+
throw new IllegalArgumentException("Not a valid rabbitqmctl command: " + rabbitmqctl);
76+
}
77+
return rabbitmqctl.substring(0, lastIndex) + "rabbitmq-upgrade";
78+
}
79+
7180
static String rabbitmqStreamsCommand() {
7281
String rabbitmqctl = rabbitmqctlCommand();
7382
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
@@ -89,6 +98,10 @@ static ProcessState rabbitmqStreams(String command) {
8998
return executeCommand(rabbitmqStreamsCommand() + " " + command);
9099
}
91100

101+
static ProcessState rabbitmqUpgrade(String command) {
102+
return executeCommand(rabbitmqUpgradeCommand() + " " + command);
103+
}
104+
92105
static ProcessState rabbitmqctlIgnoreError(String command) {
93106
return executeCommand(rabbitmqctlCommand() + " " + command, true);
94107
}
@@ -236,9 +249,34 @@ static void pauseNode(String node) {
236249
}
237250

238251
static void unpauseNode(String node) {
252+
executeCommand("docker unpause " + nodeToDockerContainer(node));
253+
}
254+
255+
static void restartNode(String node) {
256+
String container = nodeToDockerContainer(node);
257+
String dockerCommand = "docker exec " + container + " ";
258+
String rabbitmqUpgradeCommand = dockerCommand + "rabbitmq-upgrade ";
259+
executeCommand(rabbitmqUpgradeCommand + "await_online_quorum_plus_one -t 300");
260+
executeCommand(rabbitmqUpgradeCommand + "drain");
261+
executeCommand("docker stop " + container);
262+
executeCommand("docker start " + container);
263+
String otherContainer =
264+
DOCKER_NODES_TO_CONTAINERS.values().stream()
265+
.filter(c -> !c.endsWith(container))
266+
.findAny()
267+
.get();
268+
executeCommand(
269+
"docker exec "
270+
+ otherContainer
271+
+ " rabbitmqctl await_online_nodes "
272+
+ DOCKER_NODES_TO_CONTAINERS.size());
273+
executeCommand(dockerCommand + "rabbitmqctl status");
274+
}
275+
276+
private static String nodeToDockerContainer(String node) {
239277
String containerId = DOCKER_NODES_TO_CONTAINERS.get(node);
240278
Assert.notNull(containerId, "No container for node " + node);
241-
executeCommand("docker unpause " + containerId);
279+
return containerId;
242280
}
243281

244282
static List<ConnectionInfo> listConnections() {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
21+
import static java.time.Duration.ofSeconds;
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
import com.rabbitmq.client.amqp.*;
25+
import java.util.List;
26+
import org.junit.jupiter.api.*;
27+
28+
@DisabledIfNotCluster
29+
public class RecoveryClusterTest {
30+
31+
static final String[] URIS =
32+
new String[] {"amqp://localhost:5672", "amqp://localhost:5673", "amqp://localhost:5674"};
33+
static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy.fixed(ofSeconds(1));
34+
Environment environment;
35+
Connection connection;
36+
Management management;
37+
38+
@BeforeEach
39+
void init() {
40+
environment =
41+
new AmqpEnvironmentBuilder().connectionSettings().uris(URIS).environmentBuilder().build();
42+
this.connection =
43+
environment
44+
.connectionBuilder()
45+
.recovery()
46+
.backOffDelayPolicy(BACK_OFF_DELAY_POLICY)
47+
.connectionBuilder()
48+
.build();
49+
this.management = connection.management();
50+
}
51+
52+
@AfterEach
53+
void tearDown() {
54+
environment.close();
55+
}
56+
57+
@Test
58+
void clusterRestart(TestInfo info) {
59+
String name = TestUtils.name(info);
60+
try {
61+
Management.QueueInfo queueInfo =
62+
management.queue(name).type(Management.QueueType.QUORUM).declare();
63+
List<String> replicas = queueInfo.replicas();
64+
replicas.forEach(Cli::restartNode);
65+
66+
waitAtMost(
67+
() -> {
68+
try {
69+
management.queueInfo(name);
70+
return true;
71+
} catch (Exception e) {
72+
return false;
73+
}
74+
});
75+
assertThat(management.queueInfo(name).replicas())
76+
.hasSameSizeAs(replicas)
77+
.containsExactlyInAnyOrderElementsOf(replicas);
78+
79+
} finally {
80+
management.queueDeletion().delete(name);
81+
}
82+
}
83+
}

0 commit comments

Comments
 (0)