Skip to content

Commit e20b284

Browse files
committed
Test outstanding requests complete on RPC client closing
1 parent 6994cf2 commit e20b284

File tree

1 file changed

+73
-2
lines changed

1 file changed

+73
-2
lines changed

src/test/java/com/rabbitmq/model/amqp/RpcTest.java

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.rabbitmq.model.BackOffDelayPolicy.fixed;
2121
import static com.rabbitmq.model.amqp.Cli.closeConnection;
2222
import static com.rabbitmq.model.amqp.TestUtils.environmentBuilder;
23+
import static com.rabbitmq.model.amqp.TestUtils.waitAtMost;
2324
import static java.nio.charset.StandardCharsets.UTF_8;
2425
import static java.time.Duration.ofMillis;
2526
import static org.assertj.core.api.Assertions.assertThat;
@@ -283,8 +284,7 @@ void rpcShouldRecoverAfterConnectionIsClosed()
283284
}
284285

285286
@Test
286-
void poisonRequestsShouldTimeout()
287-
throws ExecutionException, InterruptedException, TimeoutException {
287+
void poisonRequestsShouldTimeout() {
288288
try (Connection clientConnection = environment.connectionBuilder().build();
289289
Connection serverConnection = environment.connectionBuilder().build()) {
290290

@@ -350,6 +350,77 @@ void poisonRequestsShouldTimeout()
350350
}
351351
}
352352

353+
@Test
354+
void outstandingRequestsShouldCompleteExceptionallyOnRpcClientClosing() throws Exception {
355+
try (Connection clientConnection = environment.connectionBuilder().build();
356+
Connection serverConnection = environment.connectionBuilder().build()) {
357+
358+
String requestQueue = serverConnection.management().queue().exclusive(true).declare().name();
359+
360+
serverConnection
361+
.rpcServerBuilder()
362+
.requestQueue(requestQueue)
363+
.handler(
364+
(ctx, msg) -> {
365+
String request = new String(msg.body(), UTF_8);
366+
if (request.contains("poison")) {
367+
return null;
368+
} else {
369+
return HANDLER.handle(ctx, msg);
370+
}
371+
})
372+
.replyPostProcessor((r, corrId) -> r == null ? null : r.correlationId(corrId))
373+
.build();
374+
375+
RpcClient rpcClient =
376+
clientConnection
377+
.rpcClientBuilder()
378+
.requestAddress()
379+
.queue(requestQueue)
380+
.rpcClient()
381+
.build();
382+
383+
int requestCount = 100;
384+
AtomicInteger expectedPoisonCount = new AtomicInteger();
385+
AtomicInteger timedOutRequestCount = new AtomicInteger();
386+
AtomicInteger completedRequestCount = new AtomicInteger();
387+
Random random = new Random();
388+
CountDownLatch allRequestSubmitted = new CountDownLatch(requestCount);
389+
IntStream.range(0, requestCount)
390+
.forEach(
391+
ignored -> {
392+
boolean poison = random.nextBoolean();
393+
String request;
394+
if (poison) {
395+
request = "poison";
396+
expectedPoisonCount.incrementAndGet();
397+
} else {
398+
request = UUID.randomUUID().toString();
399+
}
400+
executorService.submit(
401+
() -> {
402+
CompletableFuture<Message> responseFuture =
403+
rpcClient.publish(rpcClient.message(request.getBytes(UTF_8)));
404+
responseFuture.handle(
405+
(msg, ex) -> {
406+
if (ex == null) {
407+
completedRequestCount.incrementAndGet();
408+
} else {
409+
timedOutRequestCount.incrementAndGet();
410+
}
411+
return null;
412+
});
413+
});
414+
allRequestSubmitted.countDown();
415+
});
416+
TestUtils.assertThat(allRequestSubmitted).completes();
417+
waitAtMost(() -> completedRequestCount.get() == requestCount - expectedPoisonCount.get());
418+
assertThat(timedOutRequestCount).hasValue(0);
419+
rpcClient.close();
420+
assertThat(timedOutRequestCount).hasPositiveValue().hasValue(expectedPoisonCount.get());
421+
}
422+
}
423+
353424
private static AmqpConnectionBuilder connectionBuilder() {
354425
return (AmqpConnectionBuilder) environment.connectionBuilder();
355426
}

0 commit comments

Comments
 (0)