Skip to content

Use work pool and dispatch executor for incoming message processing #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/ConnectionBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
// [email protected].
package com.rabbitmq.client.amqp;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/** Builder for {@link Connection} instances. */
public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder> {

Expand All @@ -35,6 +38,22 @@ public interface ConnectionBuilder extends ConnectionSettings<ConnectionBuilder>
*/
ConnectionBuilder listeners(Resource.StateListener... listeners);

/**
* Set the executor to use for incoming message delivery.
*
* <p>The executor is shared between the connection consumers.
*
* <p>By default, an {@link ExecutorService} with {@link Runtime#availableProcessors()} thread(s)
* is created for the connection.
*
* <p>It is the developer's responsibility to shut down the executor when it is no longer needed.
*
* @param executor executor for incoming message delivery
* @return this builder instance
* @see com.rabbitmq.client.amqp.impl.AmqpEnvironmentBuilder#dispatchingExecutor(ExecutorService)
*/
ConnectionBuilder dispatchingExecutor(Executor executor);

/**
* Create the connection instance.
*
Expand Down
66 changes: 49 additions & 17 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@

final class AmqpConnection extends ResourceBase implements Connection {

private static final int DEFAULT_NUM_THREADS = Math.max(1, Utils.AVAILABLE_PROCESSORS);

/** Connection-related issues */
private static final Predicate<Exception> CONNECTION_EXCEPTION_PREDICATE =
e -> e instanceof AmqpException.AmqpConnectionException;
Expand Down Expand Up @@ -96,7 +98,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final String name;
private final Lock instanceLock = new ReentrantLock();
private final boolean filterExpressionsSupported, setTokenSupported;
private volatile ExecutorService dispatchingExecutorService;
private volatile ConsumerWorkService consumerWorkService;
private volatile Executor dispatchingExecutor;
private final boolean privateDispatchingExecutor;
private final CredentialsManager.Registration credentialsRegistration;

AmqpConnection(AmqpConnectionBuilder builder) {
Expand All @@ -116,6 +120,18 @@ final class AmqpConnection extends ResourceBase implements Connection {

this.topologyListener = createTopologyListener(builder);

Executor de =
builder.dispatchingExecutor() == null
? environment.dispatchingExecutorService()
: builder.dispatchingExecutor();

if (de == null) {
this.privateDispatchingExecutor = true;
} else {
this.privateDispatchingExecutor = false;
this.dispatchingExecutor = de;
}

if (recoveryConfiguration.activated()) {
disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, this.name());
} else {
Expand Down Expand Up @@ -680,22 +696,26 @@ ScheduledExecutorService scheduledExecutorService() {
return this.environment.scheduledExecutorService();
}

ExecutorService dispatchingExecutorService() {
ConsumerWorkService consumerWorkService() {
checkOpen();

ExecutorService result = this.dispatchingExecutorService;
ConsumerWorkService result = this.consumerWorkService;
if (result != null) {
return result;
}

this.instanceLock.lock();
try {
if (this.dispatchingExecutorService == null) {
this.dispatchingExecutorService =
Executors.newSingleThreadExecutor(
Utils.threadFactory("dispatching-" + this.name() + "-"));
if (this.consumerWorkService == null) {
if (this.privateDispatchingExecutor) {
this.dispatchingExecutor =
Executors.newFixedThreadPool(
DEFAULT_NUM_THREADS, Utils.threadFactory("dispatching-" + this.name() + "-"));
}
this.consumerWorkService =
new WorkPoolConsumerWorkService(this.dispatchingExecutor, Duration.ZERO);
}
return this.dispatchingExecutorService;
return this.consumerWorkService;
} finally {
this.instanceLock.unlock();
}
Expand Down Expand Up @@ -852,16 +872,28 @@ private void close(Throwable cause) {
} catch (InterruptedException e) {
LOGGER.info("Interrupted while waiting for connection lock");
}
if (this.consumerWorkService != null) {
try {
this.consumerWorkService.close();
} catch (Exception e) {
LOGGER.info(
"Error while closing consumer work service for connection '{}': {}",
this.name(),
e.getMessage());
}
}
try {
ExecutorService es = this.dispatchingExecutorService;
if (es != null) {
try {
es.shutdownNow();
} catch (Exception e) {
LOGGER.info(
"Error while shutting down dispatching executor service for connection '{}': {}",
this.name(),
e.getMessage());
if (this.privateDispatchingExecutor) {
Executor es = this.dispatchingExecutor;
if (es != null) {
try {
((ExecutorService) es).shutdownNow();
} catch (Exception e) {
LOGGER.info(
"Error while shutting down dispatching executor service for connection '{}': {}",
this.name(),
e.getMessage());
}
}
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;

class AmqpConnectionBuilder implements ConnectionBuilder {

Expand All @@ -31,6 +32,7 @@ class AmqpConnectionBuilder implements ConnectionBuilder {
private final DefaultConnectionSettings<AmqpConnectionBuilder> connectionSettings =
new AmqpConnectionBuilderConnectionSettings(this);
private final List<Resource.StateListener> listeners = new ArrayList<>();
private Executor dispatchingExecutor;
private String name;
private TopologyListener topologyListener;
private boolean isolateResources = false;
Expand Down Expand Up @@ -120,6 +122,12 @@ public ConnectionBuilder listeners(Resource.StateListener... listeners) {
return this;
}

@Override
public ConnectionBuilder dispatchingExecutor(Executor executor) {
this.dispatchingExecutor = executor;
return this;
}

@Override
public RecoveryConfiguration recovery() {
this.recoveryConfiguration.activated(true);
Expand All @@ -135,6 +143,10 @@ boolean isolateResources() {
return isolateResources;
}

Executor dispatchingExecutor() {
return this.dispatchingExecutor;
}

@Override
public Connection build() {
return this.environment.connection(this);
Expand All @@ -147,6 +159,7 @@ void copyTo(AmqpConnectionBuilder copy) {
copy.name(this.name);
copy.topologyListener(this.topologyListener);
copy.isolateResources(this.isolateResources);
copy.dispatchingExecutor(this.dispatchingExecutor);
}

AmqpConnectionBuilder name(String name) {
Expand Down
83 changes: 47 additions & 36 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
private final SessionHandler sessionHandler;
private final AtomicLong unsettledMessageCount = new AtomicLong(0);
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
private final ExecutorService dispatchingExecutorService;
private final java.util.function.Consumer<Delivery> nativeHandler;
private final java.util.function.Consumer<ClientException> nativeCloseHandler;
private final ConsumerWorkService consumerWorkService;
// native receiver internal state, accessed only in the native executor/scheduler
private ProtonReceiver protonReceiver;
private volatile Scheduler protonExecutor;
Expand All @@ -104,16 +104,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
this.connection = builder.connection();
this.sessionHandler = this.connection.createSessionHandler();

this.dispatchingExecutorService = connection.dispatchingExecutorService();
this.nativeHandler = createNativeHandler(messageHandler);
this.nativeCloseHandler =
e ->
this.dispatchingExecutorService.submit(
() -> {
// get result to make spotbugs happy
boolean ignored = maybeCloseConsumerOnException(this, e);
});
e -> {
this.connection
.consumerWorkService()
.dispatch(
() -> {
// get result to make spotbugs happy
boolean ignored = maybeCloseConsumerOnException(this, e);
});
};
this.consumerWorkService = connection.consumerWorkService();
this.consumerWorkService.register(this);
this.nativeReceiver =
this.createNativeReceiver(
this.sessionHandler.session(),
Expand Down Expand Up @@ -233,33 +236,36 @@ private ClientReceiver createNativeReceiver(

private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler handler) {
return delivery -> {
this.unsettledMessageCount.incrementAndGet();
this.metricsCollector.consume();
this.dispatchingExecutorService.submit(
() -> {
AmqpMessage message;
try {
message = new AmqpMessage(delivery.message());
} catch (ClientException e) {
LOGGER.warn("Error while decoding message: {}", e.getMessage());
if (this.state() == OPEN) {
this.unsettledMessageCount.incrementAndGet();
this.metricsCollector.consume();
this.consumerWorkService.dispatch(
this,
() -> {
AmqpMessage message;
try {
delivery.disposition(DeliveryState.rejected("", ""), true);
} catch (ClientException ex) {
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
message = new AmqpMessage(delivery.message());
} catch (ClientException e) {
LOGGER.warn("Error while decoding message: {}", e.getMessage());
try {
delivery.disposition(DeliveryState.rejected("", ""), true);
} catch (ClientException ex) {
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
}
return;
}
return;
}
Consumer.Context context =
new DeliveryContext(
delivery,
this.protonExecutor,
this.protonReceiver,
this.metricsCollector,
this.unsettledMessageCount,
this.replenishCreditOperation,
this);
handler.handle(context, message);
});
Consumer.Context context =
new DeliveryContext(
delivery,
this.protonExecutor,
this.protonReceiver,
this.metricsCollector,
this.unsettledMessageCount,
this.replenishCreditOperation,
this);
handler.handle(context, message);
});
}
};
}

Expand Down Expand Up @@ -296,6 +302,9 @@ void recoverAfterConnectionFailure() {
void close(Throwable cause) {
if (this.closed.compareAndSet(false, true)) {
this.state(CLOSING, cause);
if (this.consumerWorkService != null) {
this.consumerWorkService.unregister(this);
}
this.connection.removeConsumer(this);
try {
if (this.nativeReceiver != null) {
Expand All @@ -305,9 +314,11 @@ void close(Throwable cause) {
} catch (Exception e) {
LOGGER.warn("Error while closing receiver", e);
}

this.state(CLOSED, cause);
this.metricsCollector.closeConsumer();
MetricsCollector mc = this.metricsCollector;
if (mc != null) {
mc.closeConsumer();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class AmqpEnvironment implements Environment {
private final boolean internalPublisherExecutor;
private final ExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
private final Executor dispatchingExecutorService;
private final ExecutorService publisherExecutorService;
private final ConnectionManager connectionManager = new ConnectionManager(this);
private final long id;
Expand All @@ -60,6 +61,7 @@ class AmqpEnvironment implements Environment {
AmqpEnvironment(
ExecutorService executorService,
ScheduledExecutorService scheduledExecutorService,
Executor dispatchingExecutorService,
ExecutorService publisherExecutorService,
DefaultConnectionSettings<?> connectionSettings,
MetricsCollector metricsCollector,
Expand All @@ -86,6 +88,7 @@ class AmqpEnvironment implements Environment {
this.scheduledExecutorService = scheduledExecutorService;
this.internalScheduledExecutor = false;
}
this.dispatchingExecutorService = dispatchingExecutorService;
if (publisherExecutorService == null) {
this.publisherExecutorService = Utils.executorService(threadPrefix);
this.internalPublisherExecutor = true;
Expand Down Expand Up @@ -163,6 +166,10 @@ ExecutorService executorService() {
return this.executorService;
}

Executor dispatchingExecutorService() {
return this.dispatchingExecutorService;
}

ExecutorService publisherExecutorService() {
return this.publisherExecutorService;
}
Expand Down
Loading