Skip to content

Close consumer gracefully in RPC server #166

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// [email protected].
package com.rabbitmq.client.amqp;

import java.time.Duration;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -62,6 +63,18 @@ public interface RpcServerBuilder {
*/
RpcServerBuilder replyPostProcessor(BiFunction<Message, Object, Message> replyPostProcessor);

/**
* The time the server waits for all outstanding requests to be processed before closing.
*
* <p>Default is 60 seconds.
*
* <p>Set the duration to {@link Duration#ZERO} to close immediately.
*
* @param closeTimeout close timeout
* @return this builder instance
*/
RpcServerBuilder closeTimeout(Duration closeTimeout);

/**
* Create the configured instance.
*
Expand Down
27 changes: 27 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ class AmqpRpcServer implements RpcServer {
private final Function<Message, Object> correlationIdExtractor;
private final BiFunction<Message, Object, Message> replyPostProcessor;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final Duration closeTimeout;

AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder builder) {
this.connection = builder.connection();
this.closeTimeout = builder.closeTimeout();
Handler handler = builder.handler();

this.publisher = this.connection.publisherBuilder().build();
Expand Down Expand Up @@ -117,6 +119,15 @@ public void close() {
if (this.closed.compareAndSet(false, true)) {
this.connection.removeRpcServer(this);
try {
this.maybeWaitForUnsettledMessages();
} catch (Exception e) {
LOGGER.warn("Error while waiting for unsettled messages in RPC server: {}", e.getMessage());
}
try {
long unsettledMessageCount = this.consumer.unsettledMessageCount();
if (unsettledMessageCount > 0) {
LOGGER.info("Closing RPC server with {} unsettled message(s)", unsettledMessageCount);
}
this.consumer.close();
} catch (Exception e) {
LOGGER.warn("Error while closing RPC server consumer: {}", e.getMessage());
Expand All @@ -143,4 +154,20 @@ private void sendReply(Message reply) {
LOGGER.info("Error while processing RPC request: {}", e.getMessage());
}
}

private void maybeWaitForUnsettledMessages() {
if (this.closeTimeout.toNanos() > 0) {
Duration waited = Duration.ZERO;
Duration waitStep = Duration.ofMillis(10);
while (this.consumer.unsettledMessageCount() > 0 && waited.compareTo(this.closeTimeout) < 0) {
try {
Thread.sleep(100);
waited = waited.plus(waitStep);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
11 changes: 11 additions & 0 deletions src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ static class AmqpRpcServerBuilder implements RpcServerBuilder {
private RpcServer.Handler handler;
private Function<Message, Object> correlationIdExtractor;
private BiFunction<Message, Object, Message> replyPostProcessor;
private Duration closeTimeout = Duration.ofSeconds(60);

AmqpRpcServerBuilder(AmqpConnection connection) {
this.connection = connection;
Expand Down Expand Up @@ -180,6 +181,12 @@ public RpcServerBuilder replyPostProcessor(
return this;
}

@Override
public RpcServerBuilder closeTimeout(Duration closeTimeout) {
this.closeTimeout = closeTimeout;
return this;
}

@Override
public RpcServer build() {
return this.connection.createRpcServer(this);
Expand All @@ -204,5 +211,9 @@ Function<Message, Object> correlationIdExtractor() {
BiFunction<Message, Object, Message> replyPostProcessor() {
return this.replyPostProcessor;
}

Duration closeTimeout() {
return this.closeTimeout;
}
}
}
92 changes: 92 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,98 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info)
}
}

@Test
void rpcServerShouldWaitForAllOutstandingMessagesToBeProcessedBeforeClosingInternalConsumer()
throws ExecutionException, InterruptedException, TimeoutException {
try (Connection clientConnection = environment.connectionBuilder().build();
Connection serverConnection = environment.connectionBuilder().build()) {

String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();

RpcClient rpcClient =
clientConnection
.rpcClientBuilder()
.requestAddress()
.queue(requestQueue)
.rpcClient()
.build();

Sync receivedSync = sync();
RpcServer rpcServer =
serverConnection
.rpcServerBuilder()
.requestQueue(requestQueue)
.handler(
(ctx, request) -> {
receivedSync.down();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return HANDLER.handle(ctx, request);
})
.build();

String request = UUID.randomUUID().toString();
CompletableFuture<Message> responseFuture =
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
assertThat(receivedSync).completes();
rpcServer.close();
Message response = responseFuture.get(10, TimeUnit.SECONDS);
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
}
}

@Test
void outstandingRequestShouldTimeOutWhenRpcServerDoesNotCloseConsumerGracefully()
throws ExecutionException, InterruptedException, TimeoutException {
try (Connection clientConnection = environment.connectionBuilder().build();
Connection serverConnection = environment.connectionBuilder().build()) {

String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();

Duration requestTimeout = Duration.ofSeconds(1);
RpcClient rpcClient =
clientConnection
.rpcClientBuilder()
.requestTimeout(requestTimeout)
.requestAddress()
.queue(requestQueue)
.rpcClient()
.build();

Sync receivedSync = sync();
RpcServer rpcServer =
serverConnection
.rpcServerBuilder()
.closeTimeout(Duration.ZERO) // close the consumer immediately
.requestQueue(requestQueue)
.handler(
(ctx, request) -> {
receivedSync.down();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return HANDLER.handle(ctx, request);
})
.build();

String request = UUID.randomUUID().toString();
CompletableFuture<Message> responseFuture =
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
assertThat(receivedSync).completes();
rpcServer.close();
assertThatThrownBy(
() -> responseFuture.get(requestTimeout.multipliedBy(3).toMillis(), MILLISECONDS))
.isInstanceOf(ExecutionException.class)
.hasCauseInstanceOf(AmqpException.class);
waitAtMost(() -> serverConnection.management().queueInfo(requestQueue).messageCount() == 1);
}
}

private static AmqpConnectionBuilder connectionBuilder() {
return (AmqpConnectionBuilder) environment.connectionBuilder();
}
Expand Down