Skip to content

Commit 2bdcfaf

Browse files
authored
Merge pull request #162 from rabbitmq/set-dispatching-executor-service
Use work pool and dispatch executor for incoming message processing
2 parents a9fc146 + 769d8c6 commit 2bdcfaf

16 files changed

+916
-86
lines changed

src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.util.concurrent.Executor;
21+
import java.util.concurrent.ExecutorService;
22+
2023
/** Builder for {@link Connection} instances. */
2124
public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder> {
2225

@@ -35,6 +38,22 @@ public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder>
3538
*/
3639
ConnectionBuilder listeners(Resource.StateListener... listeners);
3740

41+
/**
42+
* Set the executor to use for incoming message delivery.
43+
*
44+
* <p>The executor is shared between the connection consumers.
45+
*
46+
* <p>By default, an {@link ExecutorService} with {@link Runtime#availableProcessors()} thread(s)
47+
* is created for the connection.
48+
*
49+
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
50+
*
51+
* @param executor executor for incoming message delivery
52+
* @return this builder instance
53+
* @see com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder#dispatchingExecutor(ExecutorService)
54+
*/
55+
ConnectionBuilder dispatchingExecutor(Executor executor);
56+
3857
/**
3958
* Create the connection instance.
4059
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252

5353
final class AmqpConnection extends ResourceBase implements Connection {
5454

55+
private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.AVAILABLE_PROCESSORS);
56+
5557
/** Connection-related issues */
5658
private static final Predicate<Exception> CONNECTION_EXCEPTION_PREDICATE =
5759
e -> e instanceof AmqpException.AmqpConnectionException;
@@ -96,7 +98,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
9698
private final String name;
9799
private final Lock instanceLock = new ReentrantLock();
98100
private final boolean filterExpressionsSupported, setTokenSupported;
99-
private volatile ExecutorService dispatchingExecutorService;
101+
private volatile ConsumerWorkService consumerWorkService;
102+
private volatile Executor dispatchingExecutor;
103+
private final boolean privateDispatchingExecutor;
100104
private final CredentialsManager.Registration credentialsRegistration;
101105

102106
AmqpConnection(AmqpConnectionBuilder builder) {
@@ -116,6 +120,18 @@ final class AmqpConnection extends ResourceBase implements Connection {
116120

117121
this.topologyListener = createTopologyListener(builder);
118122

123+
Executor de =
124+
builder.dispatchingExecutor() == null
125+
? environment.dispatchingExecutorService()
126+
: builder.dispatchingExecutor();
127+
128+
if (de == null) {
129+
this.privateDispatchingExecutor = true;
130+
} else {
131+
this.privateDispatchingExecutor = false;
132+
this.dispatchingExecutor = de;
133+
}
134+
119135
if (recoveryConfiguration.activated()) {
120136
disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, this.name());
121137
} else {
@@ -680,22 +696,26 @@ ScheduledExecutorService scheduledExecutorService() {
680696
return this.environment.scheduledExecutorService();
681697
}
682698

683-
ExecutorService dispatchingExecutorService() {
699+
ConsumerWorkService consumerWorkService() {
684700
checkOpen();
685701

686-
ExecutorService result = this.dispatchingExecutorService;
702+
ConsumerWorkService result = this.consumerWorkService;
687703
if (result != null) {
688704
return result;
689705
}
690706

691707
this.instanceLock.lock();
692708
try {
693-
if (this.dispatchingExecutorService == null) {
694-
this.dispatchingExecutorService =
695-
Executors.newSingleThreadExecutor(
696-
Utils.threadFactory("dispatching-" + this.name() + "-"));
709+
if (this.consumerWorkService == null) {
710+
if (this.privateDispatchingExecutor) {
711+
this.dispatchingExecutor =
712+
Executors.newFixedThreadPool(
713+
DEFAULT_NUM_THREADS, Utils.threadFactory("dispatching-" + this.name() + "-"));
714+
}
715+
this.consumerWorkService =
716+
new WorkPoolConsumerWorkService(this.dispatchingExecutor, Duration.ZERO);
697717
}
698-
return this.dispatchingExecutorService;
718+
return this.consumerWorkService;
699719
} finally {
700720
this.instanceLock.unlock();
701721
}
@@ -852,16 +872,28 @@ private void close(Throwable cause) {
852872
} catch (InterruptedException e) {
853873
LOGGER.info("Interrupted while waiting for connection lock");
854874
}
875+
if (this.consumerWorkService != null) {
876+
try {
877+
this.consumerWorkService.close();
878+
} catch (Exception e) {
879+
LOGGER.info(
880+
"Error while closing consumer work service for connection '{}': {}",
881+
this.name(),
882+
e.getMessage());
883+
}
884+
}
855885
try {
856-
ExecutorService es = this.dispatchingExecutorService;
857-
if (es != null) {
858-
try {
859-
es.shutdownNow();
860-
} catch (Exception e) {
861-
LOGGER.info(
862-
"Error while shutting down dispatching executor service for connection '{}': {}",
863-
this.name(),
864-
e.getMessage());
886+
if (this.privateDispatchingExecutor) {
887+
Executor es = this.dispatchingExecutor;
888+
if (es != null) {
889+
try {
890+
((ExecutorService) es).shutdownNow();
891+
} catch (Exception e) {
892+
LOGGER.info(
893+
"Error while shutting down dispatching executor service for connection '{}': {}",
894+
this.name(),
895+
e.getMessage());
896+
}
865897
}
866898
}
867899
try {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Duration;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.Executor;
2526

2627
class AmqpConnectionBuilder implements ConnectionBuilder {
2728

@@ -31,6 +32,7 @@ class AmqpConnectionBuilder implements ConnectionBuilder {
3132
private final DefaultConnectionSettings<AmqpConnectionBuilder> connectionSettings =
3233
new AmqpConnectionBuilderConnectionSettings(this);
3334
private final List<Resource.StateListener> listeners = new ArrayList<>();
35+
private Executor dispatchingExecutor;
3436
private String name;
3537
private TopologyListener topologyListener;
3638
private boolean isolateResources = false;
@@ -120,6 +122,12 @@ public ConnectionBuilder listeners(Resource.StateListener... listeners) {
120122
return this;
121123
}
122124

125+
@Override
126+
public ConnectionBuilder dispatchingExecutor(Executor executor) {
127+
this.dispatchingExecutor = executor;
128+
return this;
129+
}
130+
123131
@Override
124132
public RecoveryConfiguration recovery() {
125133
this.recoveryConfiguration.activated(true);
@@ -135,6 +143,10 @@ boolean isolateResources() {
135143
return isolateResources;
136144
}
137145

146+
Executor dispatchingExecutor() {
147+
return this.dispatchingExecutor;
148+
}
149+
138150
@Override
139151
public Connection build() {
140152
return this.environment.connection(this);
@@ -147,6 +159,7 @@ void copyTo(AmqpConnectionBuilder copy) {
147159
copy.name(this.name);
148160
copy.topologyListener(this.topologyListener);
149161
copy.isolateResources(this.isolateResources);
162+
copy.dispatchingExecutor(this.dispatchingExecutor);
150163
}
151164

152165
AmqpConnectionBuilder name(String name) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 47 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7575
private final SessionHandler sessionHandler;
7676
private final AtomicLong unsettledMessageCount = new AtomicLong(0);
7777
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
78-
private final ExecutorService dispatchingExecutorService;
7978
private final java.util.function.Consumer<Delivery> nativeHandler;
8079
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
80+
private final ConsumerWorkService consumerWorkService;
8181
// native receiver internal state, accessed only in the native executor/scheduler
8282
private ProtonReceiver protonReceiver;
8383
private volatile Scheduler protonExecutor;
@@ -104,16 +104,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
104104
ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
105105
this.connection = builder.connection();
106106
this.sessionHandler = this.connection.createSessionHandler();
107-
108-
this.dispatchingExecutorService = connection.dispatchingExecutorService();
109107
this.nativeHandler = createNativeHandler(messageHandler);
110108
this.nativeCloseHandler =
111-
e ->
112-
this.dispatchingExecutorService.submit(
113-
() -> {
114-
// get result to make spotbugs happy
115-
boolean ignored = maybeCloseConsumerOnException(this, e);
116-
});
109+
e -> {
110+
this.connection
111+
.consumerWorkService()
112+
.dispatch(
113+
() -> {
114+
// get result to make spotbugs happy
115+
boolean ignored = maybeCloseConsumerOnException(this, e);
116+
});
117+
};
118+
this.consumerWorkService = connection.consumerWorkService();
119+
this.consumerWorkService.register(this);
117120
this.nativeReceiver =
118121
this.createNativeReceiver(
119122
this.sessionHandler.session(),
@@ -233,33 +236,36 @@ private ClientReceiver createNativeReceiver(
233236

234237
private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler handler) {
235238
return delivery -> {
236-
this.unsettledMessageCount.incrementAndGet();
237-
this.metricsCollector.consume();
238-
this.dispatchingExecutorService.submit(
239-
() -> {
240-
AmqpMessage message;
241-
try {
242-
message = new AmqpMessage(delivery.message());
243-
} catch (ClientException e) {
244-
LOGGER.warn("Error while decoding message: {}", e.getMessage());
239+
if (this.state() == OPEN) {
240+
this.unsettledMessageCount.incrementAndGet();
241+
this.metricsCollector.consume();
242+
this.consumerWorkService.dispatch(
243+
this,
244+
() -> {
245+
AmqpMessage message;
245246
try {
246-
delivery.disposition(DeliveryState.rejected("", ""), true);
247-
} catch (ClientException ex) {
248-
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
247+
message = new AmqpMessage(delivery.message());
248+
} catch (ClientException e) {
249+
LOGGER.warn("Error while decoding message: {}", e.getMessage());
250+
try {
251+
delivery.disposition(DeliveryState.rejected("", ""), true);
252+
} catch (ClientException ex) {
253+
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
254+
}
255+
return;
249256
}
250-
return;
251-
}
252-
Consumer.Context context =
253-
new DeliveryContext(
254-
delivery,
255-
this.protonExecutor,
256-
this.protonReceiver,
257-
this.metricsCollector,
258-
this.unsettledMessageCount,
259-
this.replenishCreditOperation,
260-
this);
261-
handler.handle(context, message);
262-
});
257+
Consumer.Context context =
258+
new DeliveryContext(
259+
delivery,
260+
this.protonExecutor,
261+
this.protonReceiver,
262+
this.metricsCollector,
263+
this.unsettledMessageCount,
264+
this.replenishCreditOperation,
265+
this);
266+
handler.handle(context, message);
267+
});
268+
}
263269
};
264270
}
265271

@@ -296,6 +302,9 @@ void recoverAfterConnectionFailure() {
296302
void close(Throwable cause) {
297303
if (this.closed.compareAndSet(false, true)) {
298304
this.state(CLOSING, cause);
305+
if (this.consumerWorkService != null) {
306+
this.consumerWorkService.unregister(this);
307+
}
299308
this.connection.removeConsumer(this);
300309
try {
301310
if (this.nativeReceiver != null) {
@@ -305,9 +314,11 @@ void close(Throwable cause) {
305314
} catch (Exception e) {
306315
LOGGER.warn("Error while closing receiver", e);
307316
}
308-
309317
this.state(CLOSED, cause);
310-
this.metricsCollector.closeConsumer();
318+
MetricsCollector mc = this.metricsCollector;
319+
if (mc != null) {
320+
mc.closeConsumer();
321+
}
311322
}
312323
}
313324

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class AmqpEnvironment implements Environment {
4343
private final boolean internalPublisherExecutor;
4444
private final ExecutorService executorService;
4545
private final ScheduledExecutorService scheduledExecutorService;
46+
private final Executor dispatchingExecutorService;
4647
private final ExecutorService publisherExecutorService;
4748
private final ConnectionManager connectionManager = new ConnectionManager(this);
4849
private final long id;
@@ -60,6 +61,7 @@ class AmqpEnvironment implements Environment {
6061
AmqpEnvironment(
6162
ExecutorService executorService,
6263
ScheduledExecutorService scheduledExecutorService,
64+
Executor dispatchingExecutorService,
6365
ExecutorService publisherExecutorService,
6466
DefaultConnectionSettings<?> connectionSettings,
6567
MetricsCollector metricsCollector,
@@ -86,6 +88,7 @@ class AmqpEnvironment implements Environment {
8688
this.scheduledExecutorService = scheduledExecutorService;
8789
this.internalScheduledExecutor = false;
8890
}
91+
this.dispatchingExecutorService = dispatchingExecutorService;
8992
if (publisherExecutorService == null) {
9093
this.publisherExecutorService = Utils.executorService(threadPrefix);
9194
this.internalPublisherExecutor = true;
@@ -163,6 +166,10 @@ ExecutorService executorService() {
163166
return this.executorService;
164167
}
165168

169+
Executor dispatchingExecutorService() {
170+
return this.dispatchingExecutorService;
171+
}
172+
166173
ExecutorService publisherExecutorService() {
167174
return this.publisherExecutorService;
168175
}

0 commit comments

Comments
 (0)