Skip to content

Commit fc0d676

Browse files
committed
Add dispatching executor service setting to connection and consumer
References #160
1 parent fbc8352 commit fc0d676

File tree

6 files changed

+80
-10
lines changed

6 files changed

+80
-10
lines changed

src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.util.concurrent.ExecutorService;
21+
2022
/** Builder for {@link Connection} instances. */
2123
public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder> {
2224

@@ -35,6 +37,22 @@ public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder>
3537
*/
3638
ConnectionBuilder listeners(Resource.StateListener... listeners);
3739

40+
/**
41+
* Set the executor service to use for incoming message delivery.
42+
*
43+
* <p>The executor service is shared between the connection consumers, unless a consumer sets its
44+
* own executor service with {@link ConsumerBuilder#dispatchingExecutorService(ExecutorService)}.
45+
*
46+
* <p>By default, a new single-threaded executor is created.
47+
*
48+
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
49+
*
50+
* @param executorService executor service for incoming message delivery
51+
* @return this builder instance
52+
* @see ConsumerBuilder#dispatchingExecutorService(ExecutorService)
53+
*/
54+
ConnectionBuilder dispatchingExecutorService(ExecutorService executorService);
55+
3856
/**
3957
* Create the connection instance.
4058
*

src/main/java/com/rabbitmq/client/amqp/ConsumerBuilder.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.math.BigDecimal;
2121
import java.time.Instant;
2222
import java.util.UUID;
23+
import java.util.concurrent.ExecutorService;
2324
import org.apache.qpid.protonj2.types.*;
2425

2526
/** API to configure and create a {@link Consumer}. */
@@ -68,6 +69,19 @@ public interface ConsumerBuilder {
6869
*/
6970
ConsumerBuilder listeners(Resource.StateListener... listeners);
7071

72+
/**
73+
* Set the executor service to use for incoming message delivery for this consumer.
74+
*
75+
* <p>The consumer uses the connection's executor service by default.
76+
*
77+
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
78+
*
79+
* @param executorService executor service for incoming message delivery
80+
* @return this builder instance
81+
* @see ConnectionBuilder#dispatchingExecutorService(ExecutorService)
82+
*/
83+
ConsumerBuilder dispatchingExecutorService(ExecutorService executorService);
84+
7185
/**
7286
* Options for a consumer consuming from a stream.
7387
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
9797
private final Lock instanceLock = new ReentrantLock();
9898
private final boolean filterExpressionsSupported, setTokenSupported;
9999
private volatile ExecutorService dispatchingExecutorService;
100+
private final boolean privateDispatchingExecutorService;
100101
private final CredentialsManager.Registration credentialsRegistration;
101102

102103
AmqpConnection(AmqpConnectionBuilder builder) {
@@ -116,6 +117,13 @@ final class AmqpConnection extends ResourceBase implements Connection {
116117

117118
this.topologyListener = createTopologyListener(builder);
118119

120+
if (builder.dispatchingExecutorService() == null) {
121+
this.privateDispatchingExecutorService = true;
122+
} else {
123+
this.privateDispatchingExecutorService = false;
124+
this.dispatchingExecutorService = builder.dispatchingExecutorService();
125+
}
126+
119127
if (recoveryConfiguration.activated()) {
120128
disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, this.name());
121129
} else {
@@ -853,15 +861,17 @@ private void close(Throwable cause) {
853861
LOGGER.info("Interrupted while waiting for connection lock");
854862
}
855863
try {
856-
ExecutorService es = this.dispatchingExecutorService;
857-
if (es != null) {
858-
try {
859-
es.shutdownNow();
860-
} catch (Exception e) {
861-
LOGGER.info(
862-
"Error while shutting down dispatching executor service for connection '{}': {}",
863-
this.name(),
864-
e.getMessage());
864+
if (this.privateDispatchingExecutorService) {
865+
ExecutorService es = this.dispatchingExecutorService;
866+
if (es != null) {
867+
try {
868+
es.shutdownNow();
869+
} catch (Exception e) {
870+
LOGGER.info(
871+
"Error while shutting down dispatching executor service for connection '{}': {}",
872+
this.name(),
873+
e.getMessage());
874+
}
865875
}
866876
}
867877
try {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.time.Duration;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.ExecutorService;
2526

2627
class AmqpConnectionBuilder implements ConnectionBuilder {
2728

@@ -31,6 +32,7 @@ class AmqpConnectionBuilder implements ConnectionBuilder {
3132
private final DefaultConnectionSettings<AmqpConnectionBuilder> connectionSettings =
3233
new AmqpConnectionBuilderConnectionSettings(this);
3334
private final List<Resource.StateListener> listeners = new ArrayList<>();
35+
private ExecutorService dispatchingExecutorService;
3436
private String name;
3537
private TopologyListener topologyListener;
3638
private boolean isolateResources = false;
@@ -120,6 +122,12 @@ public ConnectionBuilder listeners(Resource.StateListener... listeners) {
120122
return this;
121123
}
122124

125+
@Override
126+
public ConnectionBuilder dispatchingExecutorService(ExecutorService executorService) {
127+
this.dispatchingExecutorService = executorService;
128+
return this;
129+
}
130+
123131
@Override
124132
public RecoveryConfiguration recovery() {
125133
this.recoveryConfiguration.activated(true);
@@ -135,6 +143,10 @@ boolean isolateResources() {
135143
return isolateResources;
136144
}
137145

146+
ExecutorService dispatchingExecutorService() {
147+
return this.dispatchingExecutorService;
148+
}
149+
138150
@Override
139151
public Connection build() {
140152
return this.environment.connection(this);
@@ -147,6 +159,7 @@ void copyTo(AmqpConnectionBuilder copy) {
147159
copy.name(this.name);
148160
copy.topologyListener(this.topologyListener);
149161
copy.isolateResources(this.isolateResources);
162+
copy.dispatchingExecutorService(this.dispatchingExecutorService);
150163
}
151164

152165
AmqpConnectionBuilder name(String name) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
105105
this.connection = builder.connection();
106106
this.sessionHandler = this.connection.createSessionHandler();
107107

108-
this.dispatchingExecutorService = connection.dispatchingExecutorService();
108+
this.dispatchingExecutorService =
109+
builder.dispatchingExecutorService() == null
110+
? connection.dispatchingExecutorService()
111+
: builder.dispatchingExecutorService();
109112
this.nativeHandler = createNativeHandler(messageHandler);
110113
this.nativeCloseHandler =
111114
e ->

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.math.BigDecimal;
2626
import java.time.Instant;
2727
import java.util.*;
28+
import java.util.concurrent.ExecutorService;
2829
import org.apache.qpid.protonj2.types.*;
2930

3031
class AmqpConsumerBuilder implements ConsumerBuilder {
@@ -36,6 +37,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3637
private Consumer.MessageHandler messageHandler;
3738
private int initialCredits = 100;
3839
private final List<Resource.StateListener> listeners = new ArrayList<>();
40+
private ExecutorService dispatchingExecutorService;
3941
private final Map<String, DescribedType> filters = new LinkedHashMap<>();
4042
private final Map<String, Object> properties = new LinkedHashMap<>();
4143
private final StreamOptions streamOptions = new DefaultStreamOptions(this, this.filters);
@@ -79,6 +81,12 @@ public ConsumerBuilder listeners(Resource.StateListener... listeners) {
7981
return this;
8082
}
8183

84+
@Override
85+
public ConsumerBuilder dispatchingExecutorService(ExecutorService executorService) {
86+
this.dispatchingExecutorService = executorService;
87+
return this;
88+
}
89+
8290
@Override
8391
public StreamOptions stream() {
8492
return this.streamOptions;
@@ -118,6 +126,10 @@ List<Resource.StateListener> listeners() {
118126
return listeners;
119127
}
120128

129+
ExecutorService dispatchingExecutorService() {
130+
return this.dispatchingExecutorService;
131+
}
132+
121133
Map<String, DescribedType> filters() {
122134
return this.filters;
123135
}

0 commit comments

Comments
 (0)