Skip to content

Commit d864a16

Browse files
committed
Activate Khepri in CI broker
1 parent f3275c4 commit d864a16

File tree

3 files changed

+12
-4
lines changed

3 files changed

+12
-4
lines changed

ci/start-broker.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,6 @@ docker run -d --name rabbitmq \
4545

4646
wait_for_message rabbitmq "completed with"
4747

48+
docker exec rabbitmq rabbitmqctl enable_feature_flag khepri_db
4849
docker exec rabbitmq rabbitmq-diagnostics erlang_version
4950
docker exec rabbitmq rabbitmqctl version

ci/start-cluster.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,8 @@ wait_for_message rabbitmq0 "completed with"
1717

1818
docker exec rabbitmq0 rabbitmqctl await_online_nodes 3
1919

20+
docker exec rabbitmq0 rabbitmqctl enable_feature_flag khepri_db
21+
docker exec rabbitmq1 rabbitmqctl enable_feature_flag khepri_db
22+
docker exec rabbitmq2 rabbitmqctl enable_feature_flag khepri_db
23+
2024
docker exec rabbitmq0 rabbitmqctl cluster_status

src/test/java/com/rabbitmq/client/amqp/impl/ClusterTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
2424
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
2525
import static java.time.Duration.ofMillis;
26+
import static java.time.Duration.ofSeconds;
2627
import static org.assertj.core.api.Assertions.assertThat;
2728

2829
import com.rabbitmq.client.amqp.*;
@@ -37,7 +38,7 @@
3738
import org.junit.jupiter.params.ParameterizedTest;
3839
import org.junit.jupiter.params.provider.EnumSource;
3940

40-
@TestUtils.DisabledIfNotCluster
41+
// @TestUtils.DisabledIfNotCluster
4142
public class ClusterTest {
4243

4344
static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy.fixed(ofMillis(100));
@@ -319,13 +320,16 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
319320
nodePaused = true;
320321

321322
publisher.publish(publisher.message().messageId(2L), ctx -> publishSync.down());
322-
assertThat(publishSync).completes();
323+
324+
assertThat(publishSync).completes(ofSeconds(20));
323325
publishSync.reset();
324326

325327
assertThat(consumeSync).completes();
326328
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L);
327329
consumeSync.reset();
328330

331+
assertThat(initialFollowers).contains(mgmt.queueInfo(q).leader());
332+
329333
Cli.unpauseNode(initialLeader);
330334
nodePaused = false;
331335

@@ -335,7 +339,6 @@ void consumeFromQuorumQueueWhenLeaderIsPaused() {
335339

336340
assertThat(consumeSync).completes();
337341
assertThat(messageIds).containsExactlyInAnyOrder(1L, 2L, 3L);
338-
consumeSync.reset();
339342

340343
waitAtMost(() -> initialFollowers.contains(mgmt.queueInfo(q).leader()));
341344
} finally {
@@ -451,7 +454,7 @@ String deleteLeader(Consumer<String> deleteMemberOperation) {
451454
String initialLeader = info.leader();
452455
int initialReplicaCount = info.replicas().size();
453456
deleteMemberOperation.accept(initialLeader);
454-
TestUtils.waitAtMost(() -> !queueInfo().leader().equals(initialLeader));
457+
TestUtils.waitAtMost(() -> !initialLeader.equals(queueInfo().leader()));
455458
assertThat(queueInfo().replicas()).hasSize(initialReplicaCount - 1);
456459
return initialLeader;
457460
}

0 commit comments

Comments
 (0)