Skip to content

Commit 3b6019e

Browse files
authored
Merge pull request #166 from rabbitmq/close-consumer-gracefully-in-rpc-server
Close consumer gracefully in RPC server
2 parents d4a146b + 33e9fcd commit 3b6019e

File tree

4 files changed

+143
-0
lines changed

4 files changed

+143
-0
lines changed

src/main/java/com/rabbitmq/client/amqp/RpcServerBuilder.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp;
1919

20+
import java.time.Duration;
2021
import java.util.function.BiFunction;
2122
import java.util.function.Function;
2223

@@ -62,6 +63,18 @@ public interface RpcServerBuilder {
6263
*/
6364
RpcServerBuilder replyPostProcessor(BiFunction<Message, Object, Message> replyPostProcessor);
6465

66+
/**
67+
* The time the server waits for all outstanding requests to be processed before closing.
68+
*
69+
* <p>Default is 60 seconds.
70+
*
71+
* <p>Set the duration to {@link Duration#ZERO} to close immediately.
72+
*
73+
* @param closeTimeout close timeout
74+
* @return this builder instance
75+
*/
76+
RpcServerBuilder closeTimeout(Duration closeTimeout);
77+
6578
/**
6679
* Create the configured instance.
6780
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ class AmqpRpcServer implements RpcServer {
5151
private final Function<Message, Object> correlationIdExtractor;
5252
private final BiFunction<Message, Object, Message> replyPostProcessor;
5353
private final AtomicBoolean closed = new AtomicBoolean(false);
54+
private final Duration closeTimeout;
5455

5556
AmqpRpcServer(RpcSupport.AmqpRpcServerBuilder builder) {
5657
this.connection = builder.connection();
58+
this.closeTimeout = builder.closeTimeout();
5759
Handler handler = builder.handler();
5860

5961
this.publisher = this.connection.publisherBuilder().build();
@@ -117,6 +119,15 @@ public void close() {
117119
if (this.closed.compareAndSet(false, true)) {
118120
this.connection.removeRpcServer(this);
119121
try {
122+
this.maybeWaitForUnsettledMessages();
123+
} catch (Exception e) {
124+
LOGGER.warn("Error while waiting for unsettled messages in RPC server: {}", e.getMessage());
125+
}
126+
try {
127+
long unsettledMessageCount = this.consumer.unsettledMessageCount();
128+
if (unsettledMessageCount > 0) {
129+
LOGGER.info("Closing RPC server with {} unsettled message(s)", unsettledMessageCount);
130+
}
120131
this.consumer.close();
121132
} catch (Exception e) {
122133
LOGGER.warn("Error while closing RPC server consumer: {}", e.getMessage());
@@ -143,4 +154,20 @@ private void sendReply(Message reply) {
143154
LOGGER.info("Error while processing RPC request: {}", e.getMessage());
144155
}
145156
}
157+
158+
private void maybeWaitForUnsettledMessages() {
159+
if (this.closeTimeout.toNanos() > 0) {
160+
Duration waited = Duration.ZERO;
161+
Duration waitStep = Duration.ofMillis(10);
162+
while (this.consumer.unsettledMessageCount() > 0 && waited.compareTo(this.closeTimeout) < 0) {
163+
try {
164+
Thread.sleep(100);
165+
waited = waited.plus(waitStep);
166+
} catch (InterruptedException e) {
167+
Thread.currentThread().interrupt();
168+
break;
169+
}
170+
}
171+
}
172+
}
146173
}

src/main/java/com/rabbitmq/client/amqp/impl/RpcSupport.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ static class AmqpRpcServerBuilder implements RpcServerBuilder {
149149
private RpcServer.Handler handler;
150150
private Function<Message, Object> correlationIdExtractor;
151151
private BiFunction<Message, Object, Message> replyPostProcessor;
152+
private Duration closeTimeout = Duration.ofSeconds(60);
152153

153154
AmqpRpcServerBuilder(AmqpConnection connection) {
154155
this.connection = connection;
@@ -180,6 +181,12 @@ public RpcServerBuilder replyPostProcessor(
180181
return this;
181182
}
182183

184+
@Override
185+
public RpcServerBuilder closeTimeout(Duration closeTimeout) {
186+
this.closeTimeout = closeTimeout;
187+
return this;
188+
}
189+
183190
@Override
184191
public RpcServer build() {
185192
return this.connection.createRpcServer(this);
@@ -204,5 +211,9 @@ Function<Message, Object> correlationIdExtractor() {
204211
BiFunction<Message, Object, Message> replyPostProcessor() {
205212
return this.replyPostProcessor;
206213
}
214+
215+
Duration closeTimeout() {
216+
return this.closeTimeout;
217+
}
207218
}
208219
}

src/test/java/com/rabbitmq/client/amqp/impl/RpcTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,98 @@ void errorDuringProcessingShouldDiscardMessageAndDeadLetterIfSet(TestInfo info)
476476
}
477477
}
478478

479+
@Test
480+
void rpcServerShouldWaitForAllOutstandingMessagesToBeProcessedBeforeClosingInternalConsumer()
481+
throws ExecutionException, InterruptedException, TimeoutException {
482+
try (Connection clientConnection = environment.connectionBuilder().build();
483+
Connection serverConnection = environment.connectionBuilder().build()) {
484+
485+
String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();
486+
487+
RpcClient rpcClient =
488+
clientConnection
489+
.rpcClientBuilder()
490+
.requestAddress()
491+
.queue(requestQueue)
492+
.rpcClient()
493+
.build();
494+
495+
Sync receivedSync = sync();
496+
RpcServer rpcServer =
497+
serverConnection
498+
.rpcServerBuilder()
499+
.requestQueue(requestQueue)
500+
.handler(
501+
(ctx, request) -> {
502+
receivedSync.down();
503+
try {
504+
Thread.sleep(1000L);
505+
} catch (InterruptedException e) {
506+
throw new RuntimeException(e);
507+
}
508+
return HANDLER.handle(ctx, request);
509+
})
510+
.build();
511+
512+
String request = UUID.randomUUID().toString();
513+
CompletableFuture<Message> responseFuture =
514+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
515+
assertThat(receivedSync).completes();
516+
rpcServer.close();
517+
Message response = responseFuture.get(10, TimeUnit.SECONDS);
518+
assertThat(response.body()).asString(UTF_8).isEqualTo(process(request));
519+
}
520+
}
521+
522+
@Test
523+
void outstandingRequestShouldTimeOutWhenRpcServerDoesNotCloseConsumerGracefully()
524+
throws ExecutionException, InterruptedException, TimeoutException {
525+
try (Connection clientConnection = environment.connectionBuilder().build();
526+
Connection serverConnection = environment.connectionBuilder().build()) {
527+
528+
String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();
529+
530+
Duration requestTimeout = Duration.ofSeconds(1);
531+
RpcClient rpcClient =
532+
clientConnection
533+
.rpcClientBuilder()
534+
.requestTimeout(requestTimeout)
535+
.requestAddress()
536+
.queue(requestQueue)
537+
.rpcClient()
538+
.build();
539+
540+
Sync receivedSync = sync();
541+
RpcServer rpcServer =
542+
serverConnection
543+
.rpcServerBuilder()
544+
.closeTimeout(Duration.ZERO) // close the consumer immediately
545+
.requestQueue(requestQueue)
546+
.handler(
547+
(ctx, request) -> {
548+
receivedSync.down();
549+
try {
550+
Thread.sleep(1000L);
551+
} catch (InterruptedException e) {
552+
throw new RuntimeException(e);
553+
}
554+
return HANDLER.handle(ctx, request);
555+
})
556+
.build();
557+
558+
String request = UUID.randomUUID().toString();
559+
CompletableFuture<Message> responseFuture =
560+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
561+
assertThat(receivedSync).completes();
562+
rpcServer.close();
563+
assertThatThrownBy(
564+
() -> responseFuture.get(requestTimeout.multipliedBy(3).toMillis(), MILLISECONDS))
565+
.isInstanceOf(ExecutionException.class)
566+
.hasCauseInstanceOf(AmqpException.class);
567+
waitAtMost(() -> serverConnection.management().queueInfo(requestQueue).messageCount() == 1);
568+
}
569+
}
570+
479571
private static AmqpConnectionBuilder connectionBuilder() {
480572
return (AmqpConnectionBuilder) environment.connectionBuilder();
481573
}

0 commit comments

Comments
 (0)