Skip to content

Commit b44d742

Browse files
authored
GH-1189: Asynchronous server-side processing in a request/reply
Fixes: #1189 * Refactor `MessagingMessageListenerAdapter` * move `BatchMessagingMessageListenerAdapter#invoke` and `RecordMessagingMessageListenerAdapter#invoke` to `MessagingMessageListenerAdapter` * move `KafkaListenerErrorHandler` to `MessagingMessageListenerAdapter` * add `@Nullable` to `KafkaListenerErrorHandler` * GH-1189: support `Mono` and `Future` * Support auto ack at async return scenario when manual commit * Support `KafkaListenerErrorHandler` * Add warn log if the container is not configured for out-of-order manual commit * Add async return test in `BatchMessagingMessageListenerAdapterTests` and `MessagingMessageListenerAdapterTests` * Add unit test async listener with `@SendTo` in `AsyncListenerTests` * Add `async-returns.adoc` and `whats-new.adoc` * GH-1189: support Kotlin `suspend` * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3` * auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc * GH-1189: `@SendTo` for `@KafkaHandler` after error is handled * Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost. * add javadoc in `AdapterUtils` * move class from package `annotation` to package `adapter` * re name bar,baz in BatchMessagingMessageListenerAdapterTests * poblish unit test `MessagingMessageListenerAdapterTests` and `EnableKafkaKotlinCoroutinesTests` * poblish doc async-returns.adoc and nav.adoc * rename `HandlerMethodDetect` to `AsyncRepliesAware` * fix javadoc in `ContinuationHandlerMethodArgumentResolver` * After kafka client 2.4 producer uses sticky partition, its randomly chose partition and topic default partitions is 2, configure that `@EmbeddedKafka `to provide just one partition per topic. * javadoc in `AsyncRepliesAware` * fix test in EnableKafkaKotlinCoroutinesTests * polish adoc * polish `DelegatingInvocableHandler` and add javadoc * polish `HandlerAdapter` * change `InvocationResult` to record * Optimization `MessagingMessageListenerAdapter.asyncFailure` * Mention version in the doc for async return types
1 parent 3ee58fb commit b44d742

26 files changed

+1316
-150
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ ext {
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.10.1'
6262
kafkaVersion = '3.6.1'
63+
kotlinCoroutinesVersion = '1.7.3'
6364
log4jVersion = '2.22.1'
6465
micrometerDocsVersion = '1.0.2'
6566
micrometerVersion = '1.13.0-SNAPSHOT'
@@ -278,6 +279,7 @@ project ('spring-kafka') {
278279
}
279280
api "org.apache.kafka:kafka-clients:$kafkaVersion"
280281
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
282+
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
281283
optionalApi 'com.fasterxml.jackson.core:jackson-core'
282284
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
283285
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'

spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
**** xref:kafka/receiving-messages/message-listeners.adoc[]
1212
**** xref:kafka/receiving-messages/message-listener-container.adoc[]
1313
**** xref:kafka/receiving-messages/ooo-commits.adoc[]
14+
**** xref:kafka/receiving-messages/async-returns.adoc[]
1415
**** xref:kafka/receiving-messages/listener-annotation.adoc[]
1516
**** xref:kafka/receiving-messages/listener-group-id.adoc[]
1617
**** xref:kafka/receiving-messages/container-thread-naming.adoc[]
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[[async-returns]]
2+
= Asynchronous `@KafkaListener` Return Types
3+
4+
Starting with version 3.2, `@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously.
5+
return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.
6+
7+
[source, java]
8+
----
9+
@KafkaListener(id = "myListener", topics = "myTopic")
10+
public CompletableFuture<String> listen(String data) {
11+
...
12+
CompletableFuture<String> future = new CompletableFuture<>();
13+
future.complete("done");
14+
return future;
15+
}
16+
----
17+
18+
[source, java]
19+
----
20+
@KafkaListener(id = "myListener", topics = "myTopic")
21+
public Mono<Void> listen(String data) {
22+
...
23+
return Mono.empty();
24+
}
25+
----
26+
27+
IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
28+
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
29+
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.
30+
31+
If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
32+
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,10 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His
1212

1313
A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
1414
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
15-
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
15+
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
16+
17+
[[x32-async-return]]
18+
=== Async @KafkaListener Return
19+
20+
`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions.
21+
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2023 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -87,6 +87,7 @@
8787
import org.springframework.kafka.listener.ContainerGroupSequencer;
8888
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
8989
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
90+
import org.springframework.kafka.listener.adapter.KafkaMessageHandlerMethodFactory;
9091
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
9192
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
9293
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
@@ -1155,7 +1156,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() {
11551156
}
11561157

11571158
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
1158-
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
1159+
DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory();
11591160
Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
11601161
if (validator != null) {
11611162
defaultFactory.setValidator(validator);
@@ -1170,8 +1171,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
11701171

11711172
List<HandlerMethodArgumentResolver> customArgumentsResolver =
11721173
new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
1173-
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
1174-
customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
11751174
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);
11761175

11771176
defaultFactory.afterPropertiesSet();

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ private String getReplyTopic() {
147147
}
148148
String topic = destinations.length == 1 ? destinations[0] : "";
149149
BeanFactory beanFactory = getBeanFactory();
150-
if (beanFactory instanceof ConfigurableListableBeanFactory) {
151-
topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic);
150+
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
151+
topic = configurableListableBeanFactory.resolveEmbeddedValue(topic);
152152
if (topic != null) {
153153
topic = resolve(topic);
154154
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,6 +72,7 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
7272
* @return the return value is ignored unless the annotated method has a
7373
* {@code @SendTo} annotation.
7474
*/
75+
@Nullable
7576
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
7677
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {
7778

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.springframework.kafka.listener.ContainerProperties.AckMode;
105105
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
106106
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
107+
import org.springframework.kafka.listener.adapter.AsyncRepliesAware;
107108
import org.springframework.kafka.support.Acknowledgment;
108109
import org.springframework.kafka.support.KafkaHeaders;
109110
import org.springframework.kafka.support.KafkaUtils;
@@ -160,6 +161,7 @@
160161
* @author Francois Rosiere
161162
* @author Daniel Gentes
162163
* @author Soby Chacko
164+
* @author Wang Zhiyang
163165
*/
164166
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
165167
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -660,6 +662,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
660662

661663
private final boolean wantsFullRecords;
662664

665+
private final boolean asyncReplies;
666+
663667
private final boolean autoCommit;
664668

665669
private final boolean isManualAck;
@@ -850,6 +854,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
850854
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
851855
ObservationRegistry observationRegistry) {
852856

857+
this.asyncReplies = listener instanceof AsyncRepliesAware hmd && hmd.isAsyncReplies()
858+
|| this.containerProperties.isAsyncAcks();
853859
AckMode ackMode = determineAckMode();
854860
this.isManualAck = ackMode.equals(AckMode.MANUAL);
855861
this.isCountAck = ackMode.equals(AckMode.COUNT)
@@ -860,12 +866,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
860866
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
861867
this.isRecordAck = ackMode.equals(AckMode.RECORD);
862868
this.offsetsInThisBatch =
863-
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
864-
? new HashMap<>()
869+
this.isAnyManualAck && this.asyncReplies
870+
? new ConcurrentHashMap<>()
865871
: null;
866872
this.deferredOffsets =
867-
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
868-
? new HashMap<>()
873+
this.isAnyManualAck && this.asyncReplies
874+
? new ConcurrentHashMap<>()
869875
: null;
870876

871877
this.observationRegistry = observationRegistry;
@@ -904,8 +910,7 @@ else if (listener instanceof MessageListener) {
904910
else {
905911
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
906912
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
907-
+ "Acknowledging"
908-
+ " not " + listener.getClass().getName());
913+
+ "Acknowledging not " + listener.getClass().getName());
909914
}
910915
this.listenerType = listenerType;
911916
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
@@ -928,18 +933,15 @@ else if (listener instanceof MessageListener) {
928933
this.logger.info(toString());
929934
}
930935
ApplicationContext applicationContext = getApplicationContext();
936+
ClassLoader classLoader = applicationContext == null
937+
? getClass().getClassLoader()
938+
: applicationContext.getClassLoader();
931939
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
932940
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
933-
consumerProperties, false,
934-
applicationContext == null
935-
? getClass().getClassLoader()
936-
: applicationContext.getClassLoader());
941+
consumerProperties, false, classLoader);
937942
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
938943
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
939-
consumerProperties, true,
940-
applicationContext == null
941-
? getClass().getClassLoader()
942-
: applicationContext.getClassLoader());
944+
consumerProperties, true, classLoader);
943945
this.syncCommitTimeout = determineSyncCommitTimeout();
944946
if (this.containerProperties.getSyncCommitTimeout() == null) {
945947
// update the property, so we can use it directly from code elsewhere
@@ -965,6 +967,9 @@ private AckMode determineAckMode() {
965967
if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
966968
ackMode = AckMode.MANUAL;
967969
}
970+
if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) {
971+
ackMode = AckMode.MANUAL;
972+
}
968973
return ackMode;
969974
}
970975

@@ -3389,12 +3394,17 @@ public void acknowledge() {
33893394
public void nack(Duration sleep) {
33903395
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33913396
"nack() can only be called on the consumer thread");
3392-
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3393-
"nack() is not supported with out-of-order commits (asyncAcks=true)");
3397+
Assert.state(!ListenerConsumer.this.asyncReplies,
3398+
"nack() is not supported with out-of-order commits");
33943399
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
33953400
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
33963401
}
33973402

3403+
@Override
3404+
public boolean isOutOfOrderCommit() {
3405+
return ListenerConsumer.this.asyncReplies;
3406+
}
3407+
33983408
@Override
33993409
public String toString() {
34003410
return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
@@ -3469,8 +3479,8 @@ public void acknowledge(int index) {
34693479
public void nack(int index, Duration sleep) {
34703480
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
34713481
"nack() can only be called on the consumer thread");
3472-
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3473-
"nack() is not supported with out-of-order commits (asyncAcks=true)");
3482+
Assert.state(!ListenerConsumer.this.asyncReplies,
3483+
"nack() is not supported with out-of-order commits");
34743484
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
34753485
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
34763486
ListenerConsumer.this.nackIndex = index;
@@ -3493,6 +3503,11 @@ public void nack(int index, Duration sleep) {
34933503
processAcks(new ConsumerRecords<K, V>(newRecords));
34943504
}
34953505

3506+
@Override
3507+
public boolean isOutOfOrderCommit() {
3508+
return ListenerConsumer.this.asyncReplies;
3509+
}
3510+
34963511
@Override
34973512
public String toString() {
34983513
return "Acknowledgment for " + this.records;

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
1921
import org.apache.kafka.clients.consumer.ConsumerRecord;
2022
import org.apache.kafka.clients.producer.RecordMetadata;
2123
import org.apache.kafka.common.TopicPartition;
@@ -24,11 +26,15 @@
2426
import org.springframework.expression.common.TemplateParserContext;
2527
import org.springframework.kafka.support.KafkaHeaders;
2628
import org.springframework.lang.Nullable;
29+
import org.springframework.util.ClassUtils;
30+
31+
import reactor.core.publisher.Mono;
2732

2833
/**
2934
* Utilities for listener adapters.
3035
*
3136
* @author Gary Russell
37+
* @author Wang Zhiyang
3238
* @since 2.5
3339
*
3440
*/
@@ -40,6 +46,9 @@ public final class AdapterUtils {
4046
*/
4147
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
4248

49+
private static final boolean MONO_PRESENT =
50+
ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader());
51+
4352
private AdapterUtils() {
4453
}
4554

@@ -85,4 +94,34 @@ public static String getDefaultReplyTopicExpression() {
8594
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
8695
}
8796

97+
/**
98+
* Return the true when return types are asynchronous.
99+
* @param resultType {@code InvocableHandlerMethod} return type.
100+
* @return type is {@code Mono} or {@code CompletableFuture}.
101+
* @since 3.2
102+
*/
103+
public static boolean isAsyncReply(Class<?> resultType) {
104+
return isMono(resultType) || isCompletableFuture(resultType);
105+
}
106+
107+
/**
108+
* Return the true when type is {@code Mono}.
109+
* @param resultType {@code InvocableHandlerMethod} return type.
110+
* @return type is {@code Mono}.
111+
* @since 3.2
112+
*/
113+
public static boolean isMono(Class<?> resultType) {
114+
return MONO_PRESENT && Mono.class.isAssignableFrom(resultType);
115+
}
116+
117+
/**
118+
* Return the true when type is {@code CompletableFuture}.
119+
* @param resultType {@code InvocableHandlerMethod} return type.
120+
* @return type is {@code CompletableFuture}.
121+
* @since 3.2
122+
*/
123+
public static boolean isCompletableFuture(Class<?> resultType) {
124+
return CompletableFuture.class.isAssignableFrom(resultType);
125+
}
126+
88127
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Copyright 2023-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
/**
20+
* Message handler adapter implementing this interface can detect {@link HandlerAdapter} async return types.
21+
*
22+
* @author Wang zhiyang
23+
*
24+
* @since 3.2
25+
*/
26+
public interface AsyncRepliesAware {
27+
28+
/**
29+
* Return true if the {@link HandlerAdapter} return type is async.
30+
* @return true for async replies.
31+
* @since 3.2
32+
*/
33+
default boolean isAsyncReplies() {
34+
return false;
35+
}
36+
37+
}

0 commit comments

Comments
 (0)