Skip to content

Commit 769d8c6

Browse files
committed
Use Executor type for dispatching, not ExecutorService
It allows other abstractions like Spring's TaskExecutor to be used. References #160
1 parent 5272d35 commit 769d8c6

File tree

11 files changed

+59
-53
lines changed

11 files changed

+59
-53
lines changed

src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.util.concurrent.Executor;
2021
import java.util.concurrent.ExecutorService;
2122

2223
/** Builder for {@link Connection} instances. */
@@ -38,21 +39,20 @@ public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder>
3839
ConnectionBuilder listeners(Resource.StateListener... listeners);
3940

4041
/**
41-
* Set the executor service to use for incoming message delivery.
42+
* Set the executor to use for incoming message delivery.
4243
*
43-
* <p>The executor service is shared between the connection consumers.
44+
* <p>The executor is shared between the connection consumers.
4445
*
45-
* <p>By default, an executor service with {@link Runtime#availableProcessors()} thread(s) is
46-
* created for the connection.
46+
* <p>By default, an {@link ExecutorService} with {@link Runtime#availableProcessors()} thread(s)
47+
* is created for the connection.
4748
*
4849
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
4950
*
50-
* @param executorService executor service for incoming message delivery
51+
* @param executor executor for incoming message delivery
5152
* @return this builder instance
52-
* @see
53-
* com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder#dispatchingExecutorService(ExecutorService)
53+
* @see com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder#dispatchingExecutor(ExecutorService)
5454
*/
55-
ConnectionBuilder dispatchingExecutorService(ExecutorService executorService);
55+
ConnectionBuilder dispatchingExecutor(Executor executor);
5656

5757
/**
5858
* Create the connection instance.

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
9999
private final Lock instanceLock = new ReentrantLock();
100100
private final boolean filterExpressionsSupported, setTokenSupported;
101101
private volatile ConsumerWorkService consumerWorkService;
102-
private volatile ExecutorService dispatchingExecutorService;
103-
private final boolean privateDispatchingExecutorService;
102+
private volatile Executor dispatchingExecutor;
103+
private final boolean privateDispatchingExecutor;
104104
private final CredentialsManager.Registration credentialsRegistration;
105105

106106
AmqpConnection(AmqpConnectionBuilder builder) {
@@ -120,16 +120,16 @@ final class AmqpConnection extends ResourceBase implements Connection {
120120

121121
this.topologyListener = createTopologyListener(builder);
122122

123-
ExecutorService des =
124-
builder.dispatchingExecutorService() == null
123+
Executor de =
124+
builder.dispatchingExecutor() == null
125125
? environment.dispatchingExecutorService()
126-
: builder.dispatchingExecutorService();
126+
: builder.dispatchingExecutor();
127127

128-
if (des == null) {
129-
this.privateDispatchingExecutorService = true;
128+
if (de == null) {
129+
this.privateDispatchingExecutor = true;
130130
} else {
131-
this.privateDispatchingExecutorService = false;
132-
this.dispatchingExecutorService = des;
131+
this.privateDispatchingExecutor = false;
132+
this.dispatchingExecutor = de;
133133
}
134134

135135
if (recoveryConfiguration.activated()) {
@@ -707,13 +707,13 @@ ConsumerWorkService consumerWorkService() {
707707
this.instanceLock.lock();
708708
try {
709709
if (this.consumerWorkService == null) {
710-
if (this.privateDispatchingExecutorService) {
711-
this.dispatchingExecutorService =
710+
if (this.privateDispatchingExecutor) {
711+
this.dispatchingExecutor =
712712
Executors.newFixedThreadPool(
713713
DEFAULT_NUM_THREADS, Utils.threadFactory("dispatching-" + this.name() + "-"));
714714
}
715715
this.consumerWorkService =
716-
new WorkPoolConsumerWorkService(this.dispatchingExecutorService, Duration.ZERO);
716+
new WorkPoolConsumerWorkService(this.dispatchingExecutor, Duration.ZERO);
717717
}
718718
return this.consumerWorkService;
719719
} finally {
@@ -883,11 +883,11 @@ private void close(Throwable cause) {
883883
}
884884
}
885885
try {
886-
if (this.privateDispatchingExecutorService) {
887-
ExecutorService es = this.dispatchingExecutorService;
886+
if (this.privateDispatchingExecutor) {
887+
Executor es = this.dispatchingExecutor;
888888
if (es != null) {
889889
try {
890-
es.shutdownNow();
890+
((ExecutorService) es).shutdownNow();
891891
} catch (Exception e) {
892892
LOGGER.info(
893893
"Error while shutting down dispatching executor service for connection '{}': {}",

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnectionBuilder.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import java.time.Duration;
2323
import java.util.ArrayList;
2424
import java.util.List;
25-
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executor;
2626

2727
class AmqpConnectionBuilder implements ConnectionBuilder {
2828

@@ -32,7 +32,7 @@ class AmqpConnectionBuilder implements ConnectionBuilder {
3232
private final DefaultConnectionSettings<AmqpConnectionBuilder> connectionSettings =
3333
new AmqpConnectionBuilderConnectionSettings(this);
3434
private final List<Resource.StateListener> listeners = new ArrayList<>();
35-
private ExecutorService dispatchingExecutorService;
35+
private Executor dispatchingExecutor;
3636
private String name;
3737
private TopologyListener topologyListener;
3838
private boolean isolateResources = false;
@@ -123,8 +123,8 @@ public ConnectionBuilder listeners(Resource.StateListener... listeners) {
123123
}
124124

125125
@Override
126-
public ConnectionBuilder dispatchingExecutorService(ExecutorService executorService) {
127-
this.dispatchingExecutorService = executorService;
126+
public ConnectionBuilder dispatchingExecutor(Executor executor) {
127+
this.dispatchingExecutor = executor;
128128
return this;
129129
}
130130

@@ -143,8 +143,8 @@ boolean isolateResources() {
143143
return isolateResources;
144144
}
145145

146-
ExecutorService dispatchingExecutorService() {
147-
return this.dispatchingExecutorService;
146+
Executor dispatchingExecutor() {
147+
return this.dispatchingExecutor;
148148
}
149149

150150
@Override
@@ -159,7 +159,7 @@ void copyTo(AmqpConnectionBuilder copy) {
159159
copy.name(this.name);
160160
copy.topologyListener(this.topologyListener);
161161
copy.isolateResources(this.isolateResources);
162-
copy.dispatchingExecutorService(this.dispatchingExecutorService);
162+
copy.dispatchingExecutor(this.dispatchingExecutor);
163163
}
164164

165165
AmqpConnectionBuilder name(String name) {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,9 +314,11 @@ void close(Throwable cause) {
314314
} catch (Exception e) {
315315
LOGGER.warn("Error while closing receiver", e);
316316
}
317-
318317
this.state(CLOSED, cause);
319-
this.metricsCollector.closeConsumer();
318+
MetricsCollector mc = this.metricsCollector;
319+
if (mc != null) {
320+
mc.closeConsumer();
321+
}
320322
}
321323
}
322324

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironment.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class AmqpEnvironment implements Environment {
4343
private final boolean internalPublisherExecutor;
4444
private final ExecutorService executorService;
4545
private final ScheduledExecutorService scheduledExecutorService;
46-
private final ExecutorService dispatchingExecutorService;
46+
private final Executor dispatchingExecutorService;
4747
private final ExecutorService publisherExecutorService;
4848
private final ConnectionManager connectionManager = new ConnectionManager(this);
4949
private final long id;
@@ -61,7 +61,7 @@ class AmqpEnvironment implements Environment {
6161
AmqpEnvironment(
6262
ExecutorService executorService,
6363
ScheduledExecutorService scheduledExecutorService,
64-
ExecutorService dispatchingExecutorService,
64+
Executor dispatchingExecutorService,
6565
ExecutorService publisherExecutorService,
6666
DefaultConnectionSettings<?> connectionSettings,
6767
MetricsCollector metricsCollector,
@@ -166,7 +166,7 @@ ExecutorService executorService() {
166166
return this.executorService;
167167
}
168168

169-
ExecutorService dispatchingExecutorService() {
169+
Executor dispatchingExecutorService() {
170170
return this.dispatchingExecutorService;
171171
}
172172

src/main/java/com/rabbitmq/client/amqp/impl/AmqpEnvironmentBuilder.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
2222
import com.rabbitmq.client.amqp.metrics.NoOpMetricsCollector;
2323
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24+
import java.util.concurrent.Executor;
2425
import java.util.concurrent.ExecutorService;
2526
import java.util.concurrent.ScheduledExecutorService;
2627

@@ -31,7 +32,7 @@ public class AmqpEnvironmentBuilder implements EnvironmentBuilder {
3132
new DefaultEnvironmentConnectionSettings(this);
3233
private ExecutorService executorService;
3334
private ScheduledExecutorService scheduledExecutorService;
34-
private ExecutorService dispatchingExecutorService;
35+
private Executor dispatchingExecutor;
3536
private ExecutorService publisherExecutorService;
3637
private MetricsCollector metricsCollector = NoOpMetricsCollector.INSTANCE;
3738
private ObservationCollector observationCollector = Utils.NO_OP_OBSERVATION_COLLECTOR;
@@ -52,20 +53,20 @@ public AmqpEnvironmentBuilder executorService(ExecutorService executorService) {
5253
}
5354

5455
/**
55-
* Set the shared executor service to use for incoming message delivery in this environment
56-
* instance connections.
56+
* Set the shared executor to use for incoming message delivery in this environment instance
57+
* connections.
5758
*
58-
* <p>There is no shared executor service by default, each connection uses its own, see {@link
59-
* ConnectionBuilder#dispatchingExecutorService(ExecutorService)}.
59+
* <p>There is no shared executor by default, each connection uses its own, see {@link
60+
* ConnectionBuilder#dispatchingExecutor(Executor)}.
6061
*
6162
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
6263
*
63-
* @param executorService the executor service for incoming message delivery
64+
* @param executor the executor for incoming message delivery
6465
* @return this builder instance
65-
* @see ConnectionBuilder#dispatchingExecutorService(ExecutorService)
66+
* @see ConnectionBuilder#dispatchingExecutor(Executor)
6667
*/
67-
public AmqpEnvironmentBuilder dispatchingExecutorService(ExecutorService executorService) {
68-
this.dispatchingExecutorService = executorService;
68+
public AmqpEnvironmentBuilder dispatchingExecutor(Executor executor) {
69+
this.dispatchingExecutor = executor;
6970
return this;
7071
}
7172

@@ -161,7 +162,7 @@ public Environment build() {
161162
return new AmqpEnvironment(
162163
executorService,
163164
scheduledExecutorService,
164-
dispatchingExecutorService,
165+
dispatchingExecutor,
165166
publisherExecutorService,
166167
connectionSettings,
167168
metricsCollector,

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,10 @@ void close(Throwable cause) {
216216
this.sessionHandler,
217217
e -> LOGGER.info("Error while closing publisher session handler", e));
218218
this.state(State.CLOSED, cause);
219-
this.metricsCollector.closePublisher();
219+
MetricsCollector mc = this.metricsCollector;
220+
if (mc != null) {
221+
mc.closePublisher();
222+
}
220223
}
221224
}
222225

src/main/java/com/rabbitmq/client/amqp/impl/WorkPoolConsumerWorkService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020
import java.time.Duration;
2121
import java.util.ArrayList;
2222
import java.util.List;
23-
import java.util.concurrent.ExecutorService;
23+
import java.util.concurrent.Executor;
2424

2525
final class WorkPoolConsumerWorkService implements ConsumerWorkService {
2626

2727
private static final int MAX_RUNNABLE_BLOCK_SIZE = 256;
2828

29-
private final ExecutorService executor;
29+
private final Executor executor;
3030
private final WorkPool<AmqpConsumer, Runnable> workPool;
3131

32-
WorkPoolConsumerWorkService(ExecutorService executorService, Duration queueingTimeout) {
32+
WorkPoolConsumerWorkService(Executor executorService, Duration queueingTimeout) {
3333
this.executor = executorService;
3434
this.workPool = new WorkPool<>(queueingTimeout);
3535
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -801,8 +801,7 @@ void messagesAreDispatchedToEnvironmentConnectionExecutorService() {
801801
String connPrefix = "conn-";
802802
ExecutorService envExecutor = newSingleThreadExecutor(threadFactory(envPrefix));
803803
ExecutorService connExecutor = newSingleThreadExecutor(threadFactory(connPrefix));
804-
Environment env =
805-
TestUtils.environmentBuilder().dispatchingExecutorService(envExecutor).build();
804+
Environment env = TestUtils.environmentBuilder().dispatchingExecutor(envExecutor).build();
806805
try {
807806
BiConsumer<Connection, String> operation =
808807
(c, prefix) -> {
@@ -825,7 +824,7 @@ void messagesAreDispatchedToEnvironmentConnectionExecutorService() {
825824
};
826825

827826
Connection c1 = env.connectionBuilder().build();
828-
Connection c2 = env.connectionBuilder().dispatchingExecutorService(connExecutor).build();
827+
Connection c2 = env.connectionBuilder().dispatchingExecutor(connExecutor).build();
829828

830829
operation.accept(c1, envPrefix);
831830
operation.accept(c2, connPrefix);

src/test/java/com/rabbitmq/client/amqp/impl/Oauth2Test.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,7 @@ void tokenShouldBeRefreshedAutomatically(boolean shared, TestInfo info) throws E
230230
}
231231

232232
@Test
233+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
233234
void tokenOnHttpsShouldBeRefreshed(TestInfo info) throws Exception {
234235
KeyStore keyStore = generateKeyPair();
235236

src/test/java/com/rabbitmq/client/amqp/impl/RecoveryClusterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ void init(TestInfo info) {
8282
Executors.newSingleThreadExecutor(Utils.threadFactory("env-dispatching-executor-service-"));
8383
environment =
8484
new AmqpEnvironmentBuilder()
85-
.dispatchingExecutorService(dispatchingExecutorService)
85+
.dispatchingExecutor(dispatchingExecutorService)
8686
.connectionSettings()
8787
.uris(URIS)
8888
.environmentBuilder()

0 commit comments

Comments
 (0)