Skip to content

Commit dfbeab2

Browse files
committed
Rationalize thread pool executor usage
4 thread pools: - executor service for internal tasks (e.g. run connection recovery) - scheduled executor service for internal scheduled tasks (e.g. schedule a connection retry) - publisher executor service to poll settlement futures - consumer executor service to poll receivers The tasks to run are different in nature, it explains the different executor services to configure. The library uses sensible defaults (e.g virtual threads if available for the publisher executor service, where there is a short task for each message; platform threads for the consumer executor, where tasks are long-running). Virtual threads do not seem appropriate for long-running tasks, especially when testing against cluster rolling restart.
1 parent 94e0f5b commit dfbeab2

File tree

8 files changed

+57
-31
lines changed

8 files changed

+57
-31
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -558,10 +558,6 @@ AmqpEnvironment environment() {
558558
return this.environment;
559559
}
560560

561-
ExecutorService executorService() {
562-
return this.environment.executorService();
563-
}
564-
565561
ScheduledExecutorService scheduledExecutorService() {
566562
return this.environment.scheduledExecutorService();
567563
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
206206

207207
private void startReceivingLoop() {
208208
Runnable receiveTask = createReceiveTask(nativeReceiver, messageHandler);
209-
this.receiveLoop = this.connection.executorService().submit(receiveTask);
209+
this.receiveLoop = this.connection.environment().consumerExecutorService().submit(receiveTask);
210210
}
211211

212212
void recoverAfterConnectionFailure() {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 37 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,28 +32,33 @@ class AmqpEnvironment implements Environment {
3232
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
3333

3434
private final Client client;
35-
private final ExecutorService executorService;
3635
private final DefaultConnectionSettings<?> connectionSettings =
3736
DefaultConnectionSettings.instance();
3837
private final AtomicBoolean closed = new AtomicBoolean(false);
3938
private final boolean internalExecutor;
4039
private final boolean internalScheduledExecutor;
40+
private final boolean internalConsumerExecutor;
41+
private final boolean internalPublisherExecutor;
42+
private final ExecutorService executorService;
43+
private final ScheduledExecutorService scheduledExecutorService;
44+
private final ExecutorService publisherExecutorService;
45+
private final ExecutorService consumerExecutorService;
4146
private final ConnectionManager connectionManager = new ConnectionManager(this);
4247
private final long id;
4348
private final Clock clock = new Clock();
44-
private final ScheduledExecutorService scheduledExecutorService;
4549
private volatile ScheduledFuture<?> clockRefreshFuture;
4650
private final AtomicBoolean clockRefreshSet = new AtomicBoolean(false);
4751
private final MetricsCollector metricsCollector;
4852
private final ObservationCollector observationCollector;
4953
private final ConnectionUtils.AffinityCache affinityCache = new ConnectionUtils.AffinityCache();
5054
private final EventLoop recoveryEventLoop;
5155
private final ExecutorService recoveryEventLoopExecutorService;
52-
private final ExecutorService managementExecutorService;
5356

5457
AmqpEnvironment(
5558
ExecutorService executorService,
5659
ScheduledExecutorService scheduledExecutorService,
60+
ExecutorService publisherExecutorService,
61+
ExecutorService consumerExecutorService,
5762
DefaultConnectionSettings<?> connectionSettings,
5863
MetricsCollector metricsCollector,
5964
ObservationCollector observationCollector) {
@@ -65,7 +70,7 @@ class AmqpEnvironment implements Environment {
6570

6671
String threadPrefix = String.format("rabbitmq-amqp-environment-%d-", this.id);
6772
if (executorService == null) {
68-
this.executorService = Utils.executorService(threadPrefix);
73+
this.executorService = Executors.newCachedThreadPool(Utils.threadFactory(threadPrefix));
6974
this.internalExecutor = true;
7075
} else {
7176
this.executorService = executorService;
@@ -79,6 +84,21 @@ class AmqpEnvironment implements Environment {
7984
this.scheduledExecutorService = scheduledExecutorService;
8085
this.internalScheduledExecutor = false;
8186
}
87+
if (publisherExecutorService == null) {
88+
this.publisherExecutorService = Utils.executorService(threadPrefix);
89+
this.internalPublisherExecutor = true;
90+
} else {
91+
this.publisherExecutorService = publisherExecutorService;
92+
this.internalPublisherExecutor = false;
93+
}
94+
if (consumerExecutorService == null) {
95+
this.consumerExecutorService =
96+
Executors.newCachedThreadPool(Utils.threadFactory(threadPrefix + "consumer-"));
97+
this.internalConsumerExecutor = true;
98+
} else {
99+
this.consumerExecutorService = consumerExecutorService;
100+
this.internalConsumerExecutor = false;
101+
}
82102
this.metricsCollector =
83103
metricsCollector == null ? NoOpMetricsCollector.INSTANCE : metricsCollector;
84104
this.observationCollector =
@@ -92,14 +112,6 @@ class AmqpEnvironment implements Environment {
92112
new LinkedBlockingQueue<>(),
93113
Utils.threadFactory(threadPrefix + "event-loop-"));
94114
this.recoveryEventLoop = new EventLoop(this.recoveryEventLoopExecutorService);
95-
this.managementExecutorService =
96-
new ThreadPoolExecutor(
97-
0,
98-
Integer.MAX_VALUE,
99-
30,
100-
TimeUnit.SECONDS,
101-
new SynchronousQueue<>(),
102-
Utils.threadFactory(threadPrefix + "management-loop-"));
103115
}
104116

105117
DefaultConnectionSettings<?> connectionSettings() {
@@ -111,6 +123,7 @@ Client client() {
111123
}
112124

113125
Clock clock() {
126+
// FIXME set clock on demand
114127
if (this.clockRefreshSet.compareAndSet(false, true)) {
115128
this.clockRefreshFuture =
116129
this.scheduledExecutorService.scheduleAtFixedRate(
@@ -126,13 +139,18 @@ public void close() {
126139
this.client.close();
127140
this.recoveryEventLoop.close();
128141
this.recoveryEventLoopExecutorService.shutdownNow();
129-
this.managementExecutorService.shutdownNow();
130142
if (this.internalExecutor) {
131143
this.executorService.shutdownNow();
132144
}
133145
if (this.internalScheduledExecutor) {
134146
this.scheduledExecutorService.shutdownNow();
135147
}
148+
if (this.internalPublisherExecutor) {
149+
this.publisherExecutorService.shutdownNow();
150+
}
151+
if (this.internalConsumerExecutor) {
152+
this.consumerExecutorService.shutdownNow();
153+
}
136154
if (this.clockRefreshFuture != null) {
137155
this.clockRefreshFuture.cancel(false);
138156
}
@@ -149,8 +167,12 @@ ExecutorService executorService() {
149167
return this.executorService;
150168
}
151169

152-
ExecutorService managementExecutorService() {
153-
return this.managementExecutorService;
170+
ExecutorService publisherExecutorService() {
171+
return this.publisherExecutorService;
172+
}
173+
174+
ExecutorService consumerExecutorService() {
175+
return this.consumerExecutorService;
154176
}
155177

156178
ScheduledExecutorService scheduledExecutorService() {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public class AmqpEnvironmentBuilder implements EnvironmentBuilder {
3333
new DefaultEnvironmentConnectionSettings(this);
3434
private ExecutorService executorService;
3535
private ScheduledExecutorService scheduledExecutorService;
36+
private ExecutorService publisherExecutorService;
37+
private ExecutorService consumerExecutorService;
3638
private MetricsCollector metricsCollector = NoOpMetricsCollector.INSTANCE;
3739
private ObservationCollector observationCollector = Utils.NO_OP_OBSERVATION_COLLECTOR;
3840

@@ -49,6 +51,16 @@ public AmqpEnvironmentBuilder scheduledExecutorService(
4951
return this;
5052
}
5153

54+
public AmqpEnvironmentBuilder publisherExecutorService(ExecutorService publisherExecutorService) {
55+
this.publisherExecutorService = publisherExecutorService;
56+
return this;
57+
}
58+
59+
public AmqpEnvironmentBuilder consumerExecutorService(ExecutorService consumerExecutorService) {
60+
this.consumerExecutorService = consumerExecutorService;
61+
return this;
62+
}
63+
5264
public AmqpEnvironmentBuilder metricsCollector(MetricsCollector metricsCollector) {
5365
this.metricsCollector = metricsCollector;
5466
return this;
@@ -69,6 +81,8 @@ public Environment build() {
6981
return new AmqpEnvironment(
7082
executorService,
7183
scheduledExecutorService,
84+
publisherExecutorService,
85+
consumerExecutorService,
7286
connectionSettings,
7387
metricsCollector,
7488
observationCollector);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,8 +368,7 @@ OutstandingRequest request(Message<?> request, UUID requestId) throws ClientExce
368368
if (loop == null) {
369369
Runnable receiveTask = receiveTask();
370370
LOGGER.debug("Starting management receive loop ({}).", this);
371-
this.receiveLoop =
372-
this.connection.environment().managementExecutorService().submit(receiveTask);
371+
this.receiveLoop = this.connection.environment().executorService().submit(receiveTask);
373372
LOGGER.debug("Management initialized ({}).", this);
374373
}
375374
} finally {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
5757
AmqpPublisher(AmqpPublisherBuilder builder) {
5858
super(builder.listeners());
5959
this.id = ID_SEQUENCE.getAndIncrement();
60-
this.executorService = builder.connection().executorService();
60+
this.executorService = builder.connection().environment().publisherExecutorService();
6161
this.address = builder.address();
6262
this.destinationSpec = builder.destination();
6363
this.connection = builder.connection();

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConnectionRecoveryTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ static void initAll() {
5656
});
5757
environment =
5858
new AmqpEnvironment(
59+
null,
60+
null,
5961
null,
6062
null,
6163
connectionSettings,

src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.Collections;
3535
import java.util.List;
3636
import java.util.concurrent.ExecutorService;
37-
import java.util.concurrent.Executors;
3837
import java.util.concurrent.ThreadFactory;
3938
import java.util.concurrent.atomic.AtomicBoolean;
4039
import java.util.concurrent.atomic.AtomicInteger;
@@ -69,14 +68,8 @@ static void initAll() {
6968

7069
@BeforeEach
7170
void init(TestInfo info) {
72-
executorService = Executors.newCachedThreadPool(Utils.threadFactory("recover-cluster-test-"));
7371
environment =
74-
new AmqpEnvironmentBuilder()
75-
.connectionSettings()
76-
.uris(URIS)
77-
.environmentBuilder()
78-
.executorService(executorService)
79-
.build();
72+
new AmqpEnvironmentBuilder().connectionSettings().uris(URIS).environmentBuilder().build();
8073
this.connection = connection(b -> b.name("c-management").recovery().connectionBuilder());
8174
this.management = connection.management();
8275
this.testInfo = info;

0 commit comments

Comments
 (0)