Skip to content

Make consumer internals asynchronous #93

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 42 additions & 13 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final String name;
private final Lock instanceLock = new ReentrantLock();
private final boolean filterExpressionsSupported;
private volatile ExecutorService dispatchingExecutorService;

AmqpConnection(AmqpConnectionBuilder builder) {
super(builder.listeners());
Expand Down Expand Up @@ -547,22 +548,21 @@ Session nativeSession(boolean check) {
if (check) {
checkOpen();
}

Session result = this.nativeSession;
if (result == null) {
this.instanceLock.lock();
try {
result = this.nativeSession;
if (result == null) {
if (check) {
checkOpen();
}
this.nativeSession = result = this.openSession(this.nativeConnection);
}
} finally {
this.instanceLock.unlock();
if (result != null) {
return result;
}

this.instanceLock.lock();
try {
if (this.nativeSession == null) {
this.nativeSession = this.openSession(this.nativeConnection);
}
return this.nativeSession;
} finally {
this.instanceLock.unlock();
}
return result;
}

private Session openSession(org.apache.qpid.protonj2.client.Connection connection) {
Expand All @@ -585,6 +585,27 @@ ScheduledExecutorService scheduledExecutorService() {
return this.environment.scheduledExecutorService();
}

ExecutorService dispatchingExecutorService() {
checkOpen();

ExecutorService result = this.dispatchingExecutorService;
if (result != null) {
return result;
}

this.instanceLock.lock();
try {
if (this.dispatchingExecutorService == null) {
this.dispatchingExecutorService =
Executors.newSingleThreadExecutor(
Utils.threadFactory("dispatching-" + this.name + "-"));
}
return this.dispatchingExecutorService;
} finally {
this.instanceLock.unlock();
}
}

Clock clock() {
return this.environment.clock();
}
Expand Down Expand Up @@ -714,6 +735,14 @@ private void close(Throwable cause) {
for (AmqpConsumer consumer : this.consumers) {
consumer.close();
}
try {
this.dispatchingExecutorService.shutdownNow();
} catch (Exception e) {
LOGGER.info(
"Error while shutting down dispatching executor service for connection '{}': {}",
this.name(),
e.getMessage());
}
try {
org.apache.qpid.protonj2.client.Connection nc = this.nativeConnection;
if (nc != null) {
Expand Down
144 changes: 108 additions & 36 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ final class AmqpConsumer extends ResourceBase implements Consumer {

private volatile ClientReceiver nativeReceiver;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile Future<?> receiveLoop;
private final int initialCredits;
private final MessageHandler messageHandler;
private final Long id;
Expand All @@ -70,6 +69,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
private final SessionHandler sessionHandler;
private final AtomicLong unsettledMessageCount = new AtomicLong(0);
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
private final ExecutorService dispatchingExecutorService;
private final java.util.function.Consumer<Delivery> nativeHandler;
private final java.util.function.Consumer<ClientException> nativeReceiverCloseHandler;
// native receiver internal state, accessed only in the native executor/scheduler
private ProtonReceiver protonReceiver;
private volatile Scheduler protonExecutor;
Expand All @@ -96,16 +98,34 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
ofNullable(builder.subscriptionListener()).orElse(NO_OP_SUBSCRIPTION_LISTENER);
this.connection = builder.connection();
this.sessionHandler = this.connection.createSessionHandler();

this.dispatchingExecutorService = connection.dispatchingExecutorService();
this.nativeHandler = createNativeHandler(messageHandler);
this.nativeReceiverCloseHandler =
e ->
this.dispatchingExecutorService.submit(
() -> {
// get result to make spotbugs happy
boolean ignored = maybeCloseConsumerOnException(this, e);
});
this.nativeReceiver =
this.createNativeReceiver(
this.sessionHandler.session(),
this.address,
this.linkProperties,
this.filters,
this.subscriptionListener);
this.subscriptionListener,
this.nativeHandler,
this.nativeReceiverCloseHandler);
this.initStateFromNativeReceiver(this.nativeReceiver);
this.metricsCollector = this.connection.metricsCollector();
this.startReceivingLoop();
try {
this.nativeReceiver.addCredit(this.initialCredits);
} catch (ClientException e) {
AmqpException ex = ExceptionUtils.convert(e);
this.close(ex);
throw ex;
}
this.state(OPEN);
this.metricsCollector.openConsumer();
}
Expand Down Expand Up @@ -163,7 +183,9 @@ private ClientReceiver createNativeReceiver(
String address,
Map<String, Object> properties,
Map<String, DescribedType> filters,
SubscriptionListener subscriptionListener) {
SubscriptionListener subscriptionListener,
java.util.function.Consumer<Delivery> nativeHandler,
java.util.function.Consumer<ClientException> closeHandler) {
try {
filters = new LinkedHashMap<>(filters);
StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
Expand All @@ -173,6 +195,8 @@ private ClientReceiver createNativeReceiver(
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
.autoAccept(false)
.autoSettle(false)
.handler(nativeHandler)
.closeHandler(closeHandler)
.creditWindow(0)
.properties(properties);
Map<String, Object> localSourceFilters = Collections.emptyMap();
Expand Down Expand Up @@ -201,6 +225,37 @@ private ClientReceiver createNativeReceiver(
}
}

private java.util.function.Consumer<Delivery> createNativeHandler(MessageHandler handler) {
return delivery -> {
this.unsettledMessageCount.incrementAndGet();
this.metricsCollector.consume();
this.dispatchingExecutorService.submit(
() -> {
AmqpMessage message;
try {
message = new AmqpMessage(delivery.message());
} catch (ClientException e) {
LOGGER.warn("Error while decoding message: {}", e.getMessage());
try {
delivery.disposition(DeliveryState.rejected("", ""), true);
} catch (ClientException ex) {
LOGGER.warn("Error while rejecting non-decoded message: {}", ex.getMessage());
}
return;
}
Consumer.Context context =
new DeliveryContext(
delivery,
this.protonExecutor,
this.metricsCollector,
this.unsettledMessageCount,
this.replenishCreditOperation,
this);
handler.handle(context, message);
});
};
}

private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHandler) {
return () -> {
try {
Expand All @@ -217,7 +272,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
this.protonExecutor,
this.metricsCollector,
this.unsettledMessageCount,
this.replenishCreditOperation);
this.replenishCreditOperation,
this);
messageHandler.handle(context, message);
}
}
Expand All @@ -237,11 +293,6 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
};
}

private void startReceivingLoop() {
Runnable receiveTask = createReceiveTask(nativeReceiver, messageHandler);
this.receiveLoop = this.connection.environment().consumerExecutorService().submit(receiveTask);
}

void recoverAfterConnectionFailure() {
this.nativeReceiver =
RetryUtils.callAndMaybeRetry(
Expand All @@ -251,7 +302,9 @@ void recoverAfterConnectionFailure() {
this.address,
this.linkProperties,
this.filters,
this.subscriptionListener),
this.subscriptionListener,
this.nativeHandler,
this.nativeReceiverCloseHandler),
e -> {
boolean shouldRetry =
e instanceof AmqpException.AmqpResourceClosedException
Expand All @@ -267,22 +320,24 @@ void recoverAfterConnectionFailure() {
this.initStateFromNativeReceiver(this.nativeReceiver);
this.pauseStatus.set(PauseStatus.UNPAUSED);
this.unsettledMessageCount.set(0);
startReceivingLoop();
try {
this.nativeReceiver.addCredit(this.initialCredits);
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
}
}

private void close(Throwable cause) {
if (this.closed.compareAndSet(false, true)) {
this.state(CLOSING, cause);
this.connection.removeConsumer(this);
if (this.receiveLoop != null) {
this.receiveLoop.cancel(true);
}
try {
this.nativeReceiver.close();
this.sessionHandler.close();
} catch (Exception e) {
LOGGER.warn("Error while closing receiver", e);
}

this.state(CLOSED, cause);
this.metricsCollector.closeConsumer();
}
Expand Down Expand Up @@ -372,18 +427,21 @@ private static class DeliveryContext implements Consumer.Context {
private final MetricsCollector metricsCollector;
private final AtomicLong unsettledMessageCount;
private final Runnable replenishCreditOperation;
private final AmqpConsumer consumer;

private DeliveryContext(
Delivery delivery,
Scheduler protonExecutor,
MetricsCollector metricsCollector,
AtomicLong unsettledMessageCount,
Runnable replenishCreditOperation) {
Runnable replenishCreditOperation,
AmqpConsumer consumer) {
this.delivery = delivery;
this.protonExecutor = protonExecutor;
this.metricsCollector = metricsCollector;
this.unsettledMessageCount = unsettledMessageCount;
this.replenishCreditOperation = replenishCreditOperation;
this.consumer = consumer;
}

@Override
Expand All @@ -394,10 +452,8 @@ public void accept() {
delivery.disposition(DeliveryState.accepted(), true);
unsettledMessageCount.decrementAndGet();
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.ACCEPTED);
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
LOGGER.debug("message accept failed: {}", e.getMessage());
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
} catch (Exception e) {
handleException(e, "accept");
}
}
}
Expand All @@ -410,10 +466,8 @@ public void discard() {
delivery.disposition(DeliveryState.rejected("", ""), true);
unsettledMessageCount.decrementAndGet();
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
LOGGER.debug("message discard failed: {}", e.getMessage());
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
} catch (Exception e) {
handleException(e, "discard");
}
}
}
Expand All @@ -428,10 +482,8 @@ public void discard(Map<String, Object> annotations) {
delivery.disposition(DeliveryState.modified(true, true, annotations), true);
unsettledMessageCount.decrementAndGet();
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.DISCARDED);
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
LOGGER.debug("message discard (modified) failed: {}", e.getMessage());
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
} catch (Exception e) {
handleException(e, "discard (modified)");
}
}
}
Expand All @@ -444,10 +496,8 @@ public void requeue() {
delivery.disposition(DeliveryState.released(), true);
unsettledMessageCount.decrementAndGet();
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
LOGGER.debug("message requeue failed: {}", e.getMessage());
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
} catch (Exception e) {
handleException(e, "requeue");
}
}
}
Expand All @@ -462,12 +512,34 @@ public void requeue(Map<String, Object> annotations) {
delivery.disposition(DeliveryState.modified(false, false, annotations), true);
unsettledMessageCount.decrementAndGet();
metricsCollector.consumeDisposition(MetricsCollector.ConsumeDisposition.REQUEUED);
} catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e) {
LOGGER.debug("message requeue (modified) failed: {}", e.getMessage());
} catch (ClientException e) {
throw ExceptionUtils.convert(e);
} catch (Exception e) {
handleException(e, "requeue (modified)");
}
}
}

private void handleException(Exception ex, String operation) {
if (maybeCloseConsumerOnException(this.consumer, ex)) {
return;
}
if (ex instanceof ClientIllegalStateException
|| ex instanceof RejectedExecutionException
|| ex instanceof ClientIOException) {
LOGGER.debug("message {} failed: {}", operation, ex.getMessage());
} else if (ex instanceof ClientException) {
throw ExceptionUtils.convert((ClientException) ex);
}
}
}

private static boolean maybeCloseConsumerOnException(AmqpConsumer consumer, Exception ex) {
if (ex instanceof ClientLinkRemotelyClosedException) {
ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException) ex;
if (ExceptionUtils.notFound(e) || ExceptionUtils.resourceDeleted(e)) {
consumer.close(ExceptionUtils.convert(e));
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class AmqpManagement implements Management {
private final TopologyListener topologyListener;
private final Supplier<String> nameSupplier;
private final AtomicReference<State> state = new AtomicReference<>(CREATED);
// private final AtomicBoolean initializing = new AtomicBoolean(false);
private volatile boolean initializing = false;
private final Lock initializationLock = new ReentrantLock();
private final Duration receiveLoopIdleTimeout;
Expand Down Expand Up @@ -208,7 +207,7 @@ void init() {
if (!this.initializing) {
try {
initializationLock.lock();
if (!this.initializing) {
if (!this.initializing && this.state() != OPEN) {
this.initializing = true;
LOGGER.debug("Initializing management ({}).", this);
this.state(UNAVAILABLE);
Expand Down
Loading