Skip to content

Commit bedf745

Browse files
committed
Use dedicated scheduler for locator recovery
1 parent 0b92a79 commit bedf745

File tree

6 files changed

+60
-78
lines changed

6 files changed

+60
-78
lines changed

.github/workflows/test-pr.yml

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,38 +11,38 @@ jobs:
1111

1212
steps:
1313
- uses: actions/checkout@v4
14-
# - name: Checkout tls-gen
15-
# uses: actions/checkout@v4
16-
# with:
17-
# repository: rabbitmq/tls-gen
18-
# path: './tls-gen'
14+
- name: Checkout tls-gen
15+
uses: actions/checkout@v4
16+
with:
17+
repository: rabbitmq/tls-gen
18+
path: './tls-gen'
1919
- name: Set up JDK
2020
uses: actions/setup-java@v4
2121
with:
2222
distribution: 'temurin'
2323
java-version: '21'
2424
cache: 'maven'
25-
# - name: Start broker
26-
# run: ci/start-broker.sh
27-
# - name: Test (no dynamic-batch publishing)
28-
# run: |
29-
# ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
30-
# -Drabbitmq.stream.producer.dynamic.batch=false \
31-
# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
32-
# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
33-
# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
34-
# - name: Test (dynamic-batch publishing)
35-
# run: |
36-
# ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
37-
# -Drabbitmq.stream.producer.dynamic.batch=true \
38-
# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
39-
# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
40-
# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
41-
# - name: Stop broker
42-
# run: docker stop rabbitmq && docker rm rabbitmq
25+
- name: Start broker
26+
run: ci/start-broker.sh
27+
- name: Test (no dynamic-batch publishing)
28+
run: |
29+
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
30+
-Drabbitmq.stream.producer.dynamic.batch=false \
31+
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
32+
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
33+
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
34+
- name: Test (dynamic-batch publishing)
35+
run: |
36+
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
37+
-Drabbitmq.stream.producer.dynamic.batch=true \
38+
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
39+
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
40+
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
41+
- name: Stop broker
42+
run: docker stop rabbitmq && docker rm rabbitmq
4343
- name: Start cluster
4444
run: ci/start-cluster.sh
4545
- name: Test against cluster
46-
run: ./mvnw test -Dtest="RecoveryClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
46+
run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
4747
- name: Stop cluster
4848
run: docker compose --file ci/cluster/docker-compose.yml down

src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ class StreamEnvironment implements Environment {
6666

6767
private final EventLoopGroup eventLoopGroup;
6868
private final ScheduledExecutorService scheduledExecutorService;
69+
private final ScheduledExecutorService locatorReconnectionScheduledExecutorService;
6970
private final boolean privateScheduleExecutorService;
7071
private final Client.ClientParameters clientParametersPrototype;
7172
private final List<Address> addresses;
@@ -235,17 +236,22 @@ class StreamEnvironment implements Environment {
235236
maxProducersByConnection,
236237
maxTrackingConsumersByConnection,
237238
connectionNamingStrategy,
238-
Utils.coordinatorClientFactory(this, producerNodeRetryDelay),
239+
coordinatorClientFactory(this, producerNodeRetryDelay),
239240
forceLeaderForProducers);
240241
this.consumersCoordinator =
241242
new ConsumersCoordinator(
242243
this,
243244
maxConsumersByConnection,
244245
connectionNamingStrategy,
245-
Utils.coordinatorClientFactory(this, consumerNodeRetryDelay),
246+
coordinatorClientFactory(this, consumerNodeRetryDelay),
246247
forceReplicaForConsumers,
247248
Utils.brokerPicker());
248249
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
250+
251+
ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
252+
this.locatorReconnectionScheduledExecutorService =
253+
Executors.newScheduledThreadPool(this.locators.size(), threadFactory);
254+
249255
ClientParameters clientParametersForInit = locatorParametersCopy();
250256
Runnable locatorInitSequence =
251257
() -> {
@@ -291,7 +297,7 @@ class StreamEnvironment implements Environment {
291297
l,
292298
connectionNamingStrategy,
293299
clientFactory,
294-
this.scheduledExecutorService,
300+
this.locatorReconnectionScheduledExecutorService,
295301
this.recoveryBackOffDelayPolicy,
296302
l.label());
297303
}
@@ -338,7 +344,7 @@ private ShutdownListener shutdownListener(
338344
locator,
339345
connectionNamingStrategy,
340346
clientFactory,
341-
this.scheduledExecutorService,
347+
this.locatorReconnectionScheduledExecutorService,
342348
delayPolicy,
343349
label);
344350
} else {
@@ -683,6 +689,9 @@ public void close() {
683689
if (privateScheduleExecutorService) {
684690
this.scheduledExecutorService.shutdownNow();
685691
}
692+
if (this.locatorReconnectionScheduledExecutorService != null) {
693+
this.locatorReconnectionScheduledExecutorService.shutdownNow();
694+
}
686695
try {
687696
if (this.eventLoopGroup != null
688697
&& (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) {
@@ -904,9 +913,7 @@ TrackingConsumerRegistration registerTrackingConsumer(
904913
@Override
905914
public String toString() {
906915
return "{ \"locators\" : ["
907-
+ this.locators.stream()
908-
.map(l -> quote(l.label()))
909-
.collect(Collectors.joining(","))
916+
+ this.locators.stream().map(l -> quote(l.label())).collect(Collectors.joining(","))
910917
+ "], "
911918
+ Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount())
912919
+ ","

src/test/java/com/rabbitmq/stream/Cli.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,15 @@ public static String rabbitmqctlCommand() {
252252
}
253253
}
254254

255+
private static String dockerContainer() {
256+
if (rabbitmqctlCommand().startsWith("docker")) {
257+
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
258+
return rabbitmqCtl.split(":")[1];
259+
} else {
260+
throw new IllegalStateException("Broker does not run on broker");
261+
}
262+
}
263+
255264
private static String rabbitmqStreamsCommand() {
256265
String rabbitmqctl = rabbitmqctlCommand();
257266
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
@@ -354,6 +363,12 @@ public static void restartNode(String node) {
354363
executeCommand(dockerCommand + "rabbitmqctl status");
355364
}
356365

366+
public static void restartBrokerContainer() {
367+
String container = dockerContainer();
368+
executeCommand("docker stop " + container);
369+
executeCommand("docker start " + container);
370+
}
371+
357372
public static void rebalance() {
358373
rabbitmqQueues("rebalance all");
359374
}

src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,9 @@ public class RecoveryClusterTest {
7272
static List<Level> logLevels;
7373
static List<Class<?>> logClasses =
7474
List.of(
75-
// ProducersCoordinator.class,
76-
// ConsumersCoordinator.class,
77-
AsyncRetry.class,
78-
StreamEnvironment.class,
79-
ScheduledExecutorServiceWrapper.class);
75+
// ProducersCoordinator.class,
76+
// ConsumersCoordinator.class,
77+
AsyncRetry.class, StreamEnvironment.class, ScheduledExecutorServiceWrapper.class);
8078
ScheduledExecutorService scheduledExecutorService;
8179

8280
@BeforeAll
@@ -125,9 +123,9 @@ static void tearDownAll() {
125123

126124
@ParameterizedTest
127125
@CsvSource({
128-
// "false,false",
126+
"false,false",
129127
"true,true",
130-
// "true,false",
128+
"true,false",
131129
})
132130
void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws InterruptedException {
133131
LOGGER.info(

src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,9 @@
1414
1515
package com.rabbitmq.stream.impl;
1616

17+
import static com.rabbitmq.stream.impl.TestUtils.*;
1718
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
1819
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
19-
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
20-
import static com.rabbitmq.stream.impl.TestUtils.localhost;
21-
import static com.rabbitmq.stream.impl.TestUtils.localhostTls;
22-
import static com.rabbitmq.stream.impl.TestUtils.streamName;
23-
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
2420
import static java.util.concurrent.TimeUnit.SECONDS;
2521
import static java.util.stream.Collectors.toList;
2622
import static java.util.stream.IntStream.range;
@@ -57,7 +53,6 @@
5753
import io.netty.channel.EventLoopGroup;
5854
import io.netty.channel.epoll.EpollEventLoopGroup;
5955
import io.netty.channel.epoll.EpollSocketChannel;
60-
import io.netty.channel.nio.NioEventLoopGroup;
6156
import io.netty.handler.ssl.SslHandler;
6257
import java.net.ConnectException;
6358
import java.nio.charset.StandardCharsets;
@@ -92,22 +87,11 @@
9287
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
9388
public class StreamEnvironmentTest {
9489

95-
static EventLoopGroup eventLoopGroup;
96-
9790
EnvironmentBuilder environmentBuilder;
9891

9992
String stream;
10093
TestUtils.ClientFactory cf;
101-
102-
@BeforeAll
103-
static void initAll() {
104-
eventLoopGroup = new NioEventLoopGroup();
105-
}
106-
107-
@AfterAll
108-
static void afterAll() throws Exception {
109-
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
110-
}
94+
EventLoopGroup eventLoopGroup;
11195

11296
@BeforeEach
11397
void init() {

src/test/java/com/rabbitmq/stream/impl/TlsTest.java

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,35 +56,13 @@
5656
import javax.net.ssl.SSLException;
5757
import javax.net.ssl.SSLHandshakeException;
5858
import javax.net.ssl.SSLParameters;
59-
import org.junit.jupiter.api.AfterEach;
60-
import org.junit.jupiter.api.BeforeEach;
6159
import org.junit.jupiter.api.Test;
6260
import org.junit.jupiter.api.extension.ExtendWith;
6361

6462
@DisabledIfTlsNotEnabled
6563
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
6664
public class TlsTest {
6765

68-
static boolean isJava13() {
69-
String javaVersion = System.getProperty("java.version");
70-
return javaVersion != null && javaVersion.startsWith("13.");
71-
}
72-
73-
@BeforeEach
74-
public void init() {
75-
if (isJava13()) {
76-
// for Java 13.0.7, see https://github.com/bcgit/bc-java/issues/941
77-
System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "PBEWithHmacSHA256AndAES_256");
78-
}
79-
}
80-
81-
@AfterEach
82-
public void tearDown() throws Exception {
83-
if (isJava13()) {
84-
System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "");
85-
}
86-
}
87-
8866
String stream;
8967

9068
TestUtils.ClientFactory cf;

0 commit comments

Comments
 (0)