Skip to content

Commit b44b4fb

Browse files
Zhiyang.Wang1Wzy19930507
authored andcommitted
GH-1189: support Mono and Future
* Support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc`
1 parent 2e59574 commit b44b4fb

File tree

9 files changed

+527
-17
lines changed

9 files changed

+527
-17
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
[[async-returns]]
2+
= Asynchronous `@KafkaListener` Return Types
3+
4+
`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture<?>` and `Mono<?>`, letting the reply be sent asynchronously.
5+
6+
[source, java]
7+
----
8+
@KafkaListener(id = "myListener", topics = "myTopic")
9+
public CompletableFuture<String> listen(String data) {
10+
...
11+
CompletableFuture<String> future = new CompletableFuture<>();
12+
future.complete("done");
13+
return future;
14+
}
15+
----
16+
17+
[source, java]
18+
----
19+
@KafkaListener(id = "myListener", topics = "myTopic")
20+
public Mono<Void> listen(String data) {
21+
...
22+
return Mono.empty();
23+
}
24+
----
25+
26+
IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes.
27+
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
28+
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.
29+
30+
If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure.
31+
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,10 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His
1212

1313
A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
1414
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
15-
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
15+
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
16+
17+
[[x32-async-return]]
18+
=== Async @KafkaListener Return
19+
20+
`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types `CompletableFuture<?>` and `Mono<?>`.
21+
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ private String getReplyTopic() {
147147
}
148148
String topic = destinations.length == 1 ? destinations[0] : "";
149149
BeanFactory beanFactory = getBeanFactory();
150-
if (beanFactory instanceof ConfigurableListableBeanFactory) {
151-
topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic);
150+
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
151+
topic = configurableListableBeanFactory.resolveEmbeddedValue(topic);
152152
if (topic != null) {
153153
topic = resolve(topic);
154154
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3395,6 +3395,11 @@ public void nack(Duration sleep) {
33953395
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
33963396
}
33973397

3398+
@Override
3399+
public boolean isAsyncAcks() {
3400+
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
3401+
}
3402+
33983403
@Override
33993404
public String toString() {
34003405
return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
@@ -3493,6 +3498,11 @@ public void nack(int index, Duration sleep) {
34933498
processAcks(new ConsumerRecords<K, V>(newRecords));
34943499
}
34953500

3501+
@Override
3502+
public boolean isAsyncAcks() {
3503+
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
3504+
}
3505+
34963506
@Override
34973507
public String toString() {
34983508
return "Acknowledgment for " + this.records;

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 85 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Iterator;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.concurrent.CompletableFuture;
2930
import java.util.stream.Collectors;
3031

3132
import org.apache.commons.logging.LogFactory;
@@ -47,6 +48,7 @@
4748
import org.springframework.kafka.listener.ConsumerSeekAware;
4849
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
4950
import org.springframework.kafka.listener.ListenerExecutionFailedException;
51+
import org.springframework.kafka.listener.MessageListener;
5052
import org.springframework.kafka.support.Acknowledgment;
5153
import org.springframework.kafka.support.KafkaHeaders;
5254
import org.springframework.kafka.support.KafkaNull;
@@ -66,9 +68,12 @@
6668
import org.springframework.messaging.support.GenericMessage;
6769
import org.springframework.messaging.support.MessageBuilder;
6870
import org.springframework.util.Assert;
71+
import org.springframework.util.ClassUtils;
6972
import org.springframework.util.ObjectUtils;
7073
import org.springframework.util.StringUtils;
7174

75+
import reactor.core.publisher.Mono;
76+
7277
/**
7378
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
7479
* providing the necessary infrastructure to extract the payload of a
@@ -95,6 +100,9 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
95100
*/
96101
protected static final Message<KafkaNull> NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE); // NOSONAR
97102

103+
private static final boolean monoPresent =
104+
ClassUtils.isPresent("reactor.core.publisher.Mono", MessageListener.class.getClassLoader());
105+
98106
private final Object bean;
99107

100108
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
@@ -368,7 +376,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
368376
try {
369377
Object result = invokeHandler(records, acknowledgment, message, consumer);
370378
if (result != null) {
371-
handleResult(result, records, message);
379+
handleResult(result, records, acknowledgment, consumer, message);
372380
}
373381
}
374382
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
@@ -436,19 +444,58 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me
436444
* response message to the SendTo topic.
437445
* @param resultArg the result object to handle (never <code>null</code>)
438446
* @param request the original request message
447+
* @param acknowledgment the acknowledgment to manual ack
448+
* @param consumer the consumer to handler error
439449
* @param source the source data for the method invocation - e.g.
440450
* {@code o.s.messaging.Message<?>}; may be null
441451
*/
442-
protected void handleResult(Object resultArg, Object request, Object source) {
452+
protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment,
453+
Consumer<?, ?> consumer, @Nullable Message<?> source) {
454+
443455
this.logger.debug(() -> "Listener method returned result [" + resultArg
444456
+ "] - generating response message for it");
445-
boolean isInvocationResult = resultArg instanceof InvocationResult;
446-
Object result = isInvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg;
447457
String replyTopic = evaluateReplyTopic(request, source, resultArg);
448458
Assert.state(replyTopic == null || this.replyTemplate != null,
449459
"a KafkaTemplate is required to support replies");
450-
sendResponse(result, replyTopic, source, isInvocationResult
451-
? ((InvocationResult) resultArg).isMessageReturnType() : this.messageReturnType);
460+
Object result;
461+
boolean messageReturnType;
462+
if (resultArg instanceof InvocationResult invocationResult) {
463+
result = invocationResult.getResult();
464+
messageReturnType = invocationResult.isMessageReturnType();
465+
}
466+
else {
467+
result = resultArg;
468+
messageReturnType = this.messageReturnType;
469+
}
470+
if (result instanceof CompletableFuture<?> completable) {
471+
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
472+
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
473+
+ "otherwise the container will ack the message immediately");
474+
}
475+
completable.whenComplete((r, t) -> {
476+
if (t == null) {
477+
asyncSuccess(r, replyTopic, source, messageReturnType);
478+
acknowledge(acknowledgment);
479+
}
480+
else {
481+
asyncFailure(request, acknowledgment, consumer, t, source);
482+
}
483+
});
484+
}
485+
else if (monoPresent && result instanceof Mono<?> mono) {
486+
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
487+
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type; " +
488+
"otherwise the container will ack the message immediately");
489+
}
490+
mono.subscribe(
491+
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
492+
t -> asyncFailure(request, acknowledgment, consumer, t, source),
493+
() -> acknowledge(acknowledgment)
494+
);
495+
}
496+
else {
497+
sendResponse(result, replyTopic, source, messageReturnType);
498+
}
452499
}
453500

454501
@Nullable
@@ -586,6 +633,37 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
586633
this.replyTemplate.send(builder.build());
587634
}
588635

636+
protected void asyncSuccess(@Nullable Object result, String replyTopic, Message<?> source, boolean returnTypeMessage) {
637+
if (result == null) {
638+
if (this.logger.isDebugEnabled()) {
639+
this.logger.debug("Async result is null, ignoring");
640+
}
641+
}
642+
else {
643+
sendResponse(result, replyTopic, source, returnTypeMessage);
644+
}
645+
}
646+
647+
protected void acknowledge(@Nullable Acknowledgment acknowledgment) {
648+
if (acknowledgment != null) {
649+
acknowledgment.acknowledge();
650+
}
651+
}
652+
653+
protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
654+
Throwable t, Message<?> source) {
655+
try {
656+
handleException(request, acknowledgment, consumer, source,
657+
new ListenerExecutionFailedException(createMessagingErrorMessage(
658+
"Async Fail", source.getPayload()), t));
659+
return;
660+
}
661+
catch (Exception ex) {
662+
}
663+
this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source);
664+
acknowledge(acknowledgment);
665+
}
666+
589667
protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
590668
Message<?> message, ListenerExecutionFailedException e) {
591669

@@ -596,7 +674,7 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle
596674
}
597675
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
598676
if (result != null) {
599-
handleResult(result, records, message);
677+
handleResult(result, records, acknowledgment, consumer, message);
600678
}
601679
}
602680
catch (Exception ex) {

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,8 @@ default void nack(int index, Duration sleep) {
8181
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
8282
}
8383

84+
default boolean isAsyncAcks() {
85+
return false;
86+
}
87+
8488
}

0 commit comments

Comments
 (0)