Skip to content

Commit 6fab398

Browse files
committed
Use work pool to dispatch incoming messages
It is based on the mechanism the RabbitMQ AMQP 091 Java client uses. The work pool accumulates the message processing tasks and a work service dispatches the work blocks to an executor. Each connection creates its own executor service if none is set. An executor service can be set at the environment level and shared between all connections. An executor service can be set at the connection level as well. References #160
1 parent fc0d676 commit 6fab398

16 files changed

+845
-104
lines changed

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.math.BigDecimal;
2121
import java.time.Instant;
2222
import java.util.UUID;
23-
import java.util.concurrent.ExecutorService;
2423
import org.apache.qpid.protonj2.types.*;
2524

2625
/** API to configure and create a {@link Consumer}. */
@@ -69,19 +68,6 @@ public interface ConsumerBuilder {
6968
*/
7069
ConsumerBuilder listeners(Resource.StateListener... listeners);
7170

72-
/**
73-
* Set the executor service to use for incoming message delivery for this consumer.
74-
*
75-
* <p>The consumer uses the connection's executor service by default.
76-
*
77-
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
78-
*
79-
* @param executorService executor service for incoming message delivery
80-
* @return this builder instance
81-
* @see ConnectionBuilder#dispatchingExecutorService(ExecutorService)
82-
*/
83-
ConsumerBuilder dispatchingExecutorService(ExecutorService executorService);
84-
8571
/**
8672
* Options for a consumer consuming from a stream.
8773
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252

5353
final class AmqpConnection extends ResourceBase implements Connection {
5454

55+
private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.AVAILABLE_PROCESSORS);
56+
5557
/** Connection-related issues */
5658
private static final Predicate<Exception> CONNECTION_EXCEPTION_PREDICATE =
5759
e -> e instanceof AmqpException.AmqpConnectionException;
@@ -96,6 +98,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
9698
private final String name;
9799
private final Lock instanceLock = new ReentrantLock();
98100
private final boolean filterExpressionsSupported, setTokenSupported;
101+
private volatile ConsumerWorkService consumerWorkService;
99102
private volatile ExecutorService dispatchingExecutorService;
100103
private final boolean privateDispatchingExecutorService;
101104
private final CredentialsManager.Registration credentialsRegistration;
@@ -117,11 +120,16 @@ final class AmqpConnection extends ResourceBase implements Connection {
117120

118121
this.topologyListener = createTopologyListener(builder);
119122

120-
if (builder.dispatchingExecutorService() == null) {
123+
ExecutorService des =
124+
builder.dispatchingExecutorService() == null
125+
? environment.dispatchingExecutorService()
126+
: builder.dispatchingExecutorService();
127+
128+
if (des == null) {
121129
this.privateDispatchingExecutorService = true;
122130
} else {
123131
this.privateDispatchingExecutorService = false;
124-
this.dispatchingExecutorService = builder.dispatchingExecutorService();
132+
this.dispatchingExecutorService = des;
125133
}
126134

127135
if (recoveryConfiguration.activated()) {
@@ -688,22 +696,26 @@ ScheduledExecutorService scheduledExecutorService() {
688696
return this.environment.scheduledExecutorService();
689697
}
690698

691-
ExecutorService dispatchingExecutorService() {
699+
ConsumerWorkService consumerWorkService() {
692700
checkOpen();
693701

694-
ExecutorService result = this.dispatchingExecutorService;
702+
ConsumerWorkService result = this.consumerWorkService;
695703
if (result != null) {
696704
return result;
697705
}
698706

699707
this.instanceLock.lock();
700708
try {
701-
if (this.dispatchingExecutorService == null) {
702-
this.dispatchingExecutorService =
703-
Executors.newSingleThreadExecutor(
704-
Utils.threadFactory("dispatching-" + this.name() + "-"));
709+
if (this.consumerWorkService == null) {
710+
if (this.privateDispatchingExecutorService) {
711+
this.dispatchingExecutorService =
712+
Executors.newFixedThreadPool(
713+
DEFAULT_NUM_THREADS, Utils.threadFactory("dispatching-" + this.name() + "-"));
714+
}
715+
this.consumerWorkService =
716+
new WorkPoolConsumerWorkService(this.dispatchingExecutorService, Duration.ZERO);
705717
}
706-
return this.dispatchingExecutorService;
718+
return this.consumerWorkService;
707719
} finally {
708720
this.instanceLock.unlock();
709721
}
@@ -860,6 +872,16 @@ private void close(Throwable cause) {
860872
} catch (InterruptedException e) {
861873
LOGGER.info("Interrupted while waiting for connection lock");
862874
}
875+
if (this.consumerWorkService != null) {
876+
try {
877+
this.consumerWorkService.close();
878+
} catch (Exception e) {
879+
LOGGER.info(
880+
"Error while closing consumer work service for connection '{}': {}",
881+
this.name(),
882+
e.getMessage());
883+
}
884+
}
863885
try {
864886
if (this.privateDispatchingExecutorService) {
865887
ExecutorService es = this.dispatchingExecutorService;

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 43 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7575
private final SessionHandler sessionHandler;
7676
private final AtomicLong unsettledMessageCount = new AtomicLong(0);
7777
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
78-
private final ExecutorService dispatchingExecutorService;
7978
private final java.util.function.Consumer<Delivery> nativeHandler;
8079
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
80+
private final ConsumerWorkService consumerWorkService;
8181
// native receiver internal state, accessed only in the native executor/scheduler
8282
private ProtonReceiver protonReceiver;
8383
private volatile Scheduler protonExecutor;
@@ -104,19 +104,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
104104
ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
105105
this.connection = builder.connection();
106106
this.sessionHandler = this.connection.createSessionHandler();
107-
108-
this.dispatchingExecutorService =
109-
builder.dispatchingExecutorService() == null
110-
? connection.dispatchingExecutorService()
111-
: builder.dispatchingExecutorService();
112107
this.nativeHandler = createNativeHandler(messageHandler);
113108
this.nativeCloseHandler =
114-
e ->
115-
this.dispatchingExecutorService.submit(
116-
() -> {
117-
// get result to make spotbugs happy
118-
boolean ignored = maybeCloseConsumerOnException(this, e);
119-
});
109+
e -> {
110+
this.connection
111+
.consumerWorkService()
112+
.dispatch(
113+
() -> {
114+
// get result to make spotbugs happy
115+
boolean ignored = maybeCloseConsumerOnException(this, e);
116+
});
117+
};
118+
this.consumerWorkService = connection.consumerWorkService();
119+
this.consumerWorkService.register(this);
120120
this.nativeReceiver =
121121
this.createNativeReceiver(
122122
this.sessionHandler.session(),
@@ -236,33 +236,36 @@ private ClientReceiver createNativeReceiver(
236236

237237
private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler handler) {
238238
return delivery -> {
239-
this.unsettledMessageCount.incrementAndGet();
240-
this.metricsCollector.consume();
241-
this.dispatchingExecutorService.submit(
242-
() -> {
243-
AmqpMessage message;
244-
try {
245-
message = new AmqpMessage(delivery.message());
246-
} catch (ClientException e) {
247-
LOGGER.warn("Error while decoding message: {}", e.getMessage());
239+
if (this.state() == OPEN) {
240+
this.unsettledMessageCount.incrementAndGet();
241+
this.metricsCollector.consume();
242+
this.consumerWorkService.dispatch(
243+
this,
244+
() -> {
245+
AmqpMessage message;
248246
try {
249-
delivery.disposition(DeliveryState.rejected("", ""), true);
250-
} catch (ClientException ex) {
251-
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
247+
message = new AmqpMessage(delivery.message());
248+
} catch (ClientException e) {
249+
LOGGER.warn("Error while decoding message: {}", e.getMessage());
250+
try {
251+
delivery.disposition(DeliveryState.rejected("", ""), true);
252+
} catch (ClientException ex) {
253+
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
254+
}
255+
return;
252256
}
253-
return;
254-
}
255-
Consumer.Context context =
256-
new DeliveryContext(
257-
delivery,
258-
this.protonExecutor,
259-
this.protonReceiver,
260-
this.metricsCollector,
261-
this.unsettledMessageCount,
262-
this.replenishCreditOperation,
263-
this);
264-
handler.handle(context, message);
265-
});
257+
Consumer.Context context =
258+
new DeliveryContext(
259+
delivery,
260+
this.protonExecutor,
261+
this.protonReceiver,
262+
this.metricsCollector,
263+
this.unsettledMessageCount,
264+
this.replenishCreditOperation,
265+
this);
266+
handler.handle(context, message);
267+
});
268+
}
266269
};
267270
}
268271

@@ -299,6 +302,9 @@ void recoverAfterConnectionFailure() {
299302
void close(Throwable cause) {
300303
if (this.closed.compareAndSet(false, true)) {
301304
this.state(CLOSING, cause);
305+
if (this.consumerWorkService != null) {
306+
this.consumerWorkService.unregister(this);
307+
}
302308
this.connection.removeConsumer(this);
303309
try {
304310
if (this.nativeReceiver != null) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.math.BigDecimal;
2626
import java.time.Instant;
2727
import java.util.*;
28-
import java.util.concurrent.ExecutorService;
2928
import org.apache.qpid.protonj2.types.*;
3029

3130
class AmqpConsumerBuilder implements ConsumerBuilder {
@@ -37,7 +36,6 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3736
private Consumer.MessageHandler messageHandler;
3837
private int initialCredits = 100;
3938
private final List<Resource.StateListener> listeners = new ArrayList<>();
40-
private ExecutorService dispatchingExecutorService;
4139
private final Map<String, DescribedType> filters = new LinkedHashMap<>();
4240
private final Map<String, Object> properties = new LinkedHashMap<>();
4341
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
@@ -81,12 +79,6 @@ public ConsumerBuilder listeners(Resource.StateListener... listeners) {
8179
return this;
8280
}
8381

84-
@Override
85-
public ConsumerBuilder dispatchingExecutorService(ExecutorService executorService) {
86-
this.dispatchingExecutorService = executorService;
87-
return this;
88-
}
89-
9082
@Override
9183
public StreamOptions stream() {
9284
return this.streamOptions;
@@ -126,10 +118,6 @@ List<Resource.StateListener> listeners() {
126118
return listeners;
127119
}
128120

129-
ExecutorService dispatchingExecutorService() {
130-
return this.dispatchingExecutorService;
131-
}
132-
133121
Map<String, DescribedType> filters() {
134122
return this.filters;
135123
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class AmqpEnvironment implements Environment {
4343
private final boolean internalPublisherExecutor;
4444
private final ExecutorService executorService;
4545
private final ScheduledExecutorService scheduledExecutorService;
46+
private final ExecutorService dispatchingExecutorService;
4647
private final ExecutorService publisherExecutorService;
4748
private final ConnectionManager connectionManager = new ConnectionManager(this);
4849
private final long id;
@@ -60,6 +61,7 @@ class AmqpEnvironment implements Environment {
6061
AmqpEnvironment(
6162
ExecutorService executorService,
6263
ScheduledExecutorService scheduledExecutorService,
64+
ExecutorService dispatchingExecutorService,
6365
ExecutorService publisherExecutorService,
6466
DefaultConnectionSettings<?> connectionSettings,
6567
MetricsCollector metricsCollector,
@@ -86,6 +88,7 @@ class AmqpEnvironment implements Environment {
8688
this.scheduledExecutorService = scheduledExecutorService;
8789
this.internalScheduledExecutor = false;
8890
}
91+
this.dispatchingExecutorService = dispatchingExecutorService;
8992
if (publisherExecutorService == null) {
9093
this.publisherExecutorService = Utils.executorService(threadPrefix);
9194
this.internalPublisherExecutor = true;
@@ -163,6 +166,10 @@ ExecutorService executorService() {
163166
return this.executorService;
164167
}
165168

169+
ExecutorService dispatchingExecutorService() {
170+
return this.dispatchingExecutorService;
171+
}
172+
166173
ExecutorService publisherExecutorService() {
167174
return this.publisherExecutorService;
168175
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
import com.rabbitmq.client.amqp.ConnectionSettings;
21-
import com.rabbitmq.client.amqp.Environment;
22-
import com.rabbitmq.client.amqp.EnvironmentBuilder;
23-
import com.rabbitmq.client.amqp.ObservationCollector;
20+
import com.rabbitmq.client.amqp.*;
2421
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
2522
import com.rabbitmq.client.amqp.metrics.NoOpMetricsCollector;
2623
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -34,6 +31,7 @@ public class AmqpEnvironmentBuilder implements EnvironmentBuilder {
3431
new DefaultEnvironmentConnectionSettings(this);
3532
private ExecutorService executorService;
3633
private ScheduledExecutorService scheduledExecutorService;
34+
private ExecutorService dispatchingExecutorService;
3735
private ExecutorService publisherExecutorService;
3836
private MetricsCollector metricsCollector = NoOpMetricsCollector.INSTANCE;
3937
private ObservationCollector observationCollector = Utils.NO_OP_OBSERVATION_COLLECTOR;
@@ -53,6 +51,11 @@ public AmqpEnvironmentBuilder executorService(ExecutorService executorService) {
5351
return this;
5452
}
5553

54+
public AmqpEnvironmentBuilder dispatchingExecutorService(ExecutorService executorService) {
55+
this.dispatchingExecutorService = executorService;
56+
return this;
57+
}
58+
5659
/**
5760
* Set scheduled executor service used for internal tasks (e.g. connection recovery).
5861
*
@@ -145,6 +148,7 @@ public Environment build() {
145148
return new AmqpEnvironment(
146149
executorService,
147150
scheduledExecutorService,
151+
dispatchingExecutorService,
148152
publisherExecutorService,
149153
connectionSettings,
150154
metricsCollector,

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
5555
private final Duration publishTimeout;
5656
private final SessionHandler sessionHandler;
5757
private volatile ObservationCollector.ConnectionInfo connectionInfo;
58-
private final ExecutorService dispatchingExecutorService;
5958
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
6059

6160
AmqpPublisher(AmqpPublisherBuilder builder) {
@@ -67,14 +66,16 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
6766
this.connection = builder.connection();
6867
this.publishTimeout = builder.publishTimeout();
6968
this.sessionHandler = this.connection.createSessionHandler();
70-
this.dispatchingExecutorService = connection.dispatchingExecutorService();
7169
this.nativeCloseHandler =
72-
e ->
73-
this.dispatchingExecutorService.submit(
74-
() -> {
75-
// get result to make spotbugs happy
76-
boolean ignored = maybeCloseConsumerOnException(this, e);
77-
});
70+
e -> {
71+
this.connection
72+
.consumerWorkService()
73+
.dispatch(
74+
() -> {
75+
// get result to make spotbugs happy
76+
boolean ignored = maybeCloseConsumerOnException(this, e);
77+
});
78+
};
7879
this.sender =
7980
this.createSender(
8081
sessionHandler.session(), this.address, this.publishTimeout, this.nativeCloseHandler);

0 commit comments

Comments
 (0)