Skip to content

Commit 89fc6d0

Browse files
author
Zhiyang.Wang1
committed
GH-1189: Auto detect async reply
* auto-detect async reply than coerce the out-of-order manual commit. * add new interface `HandlerMethodDetect` to detect handler args and return type. * add auto-detect async reply than coerce the out-of-order manual commit unit test at `@KafkaListener` `@KafkaHandler` scene. * modify async-returns.adoc
1 parent 13ea4ea commit 89fc6d0

File tree

10 files changed

+257
-37
lines changed

10 files changed

+257
-37
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public Mono<Void> listen(String data) {
2323
}
2424
----
2525

26-
IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes.
26+
IMPORTANT: The `AckMode` will be automatically set the `MANUAL` and enable out-of-order commits when async return types are detected; instead, the asynchronous completion will ack when the async operation completes.
2727
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
2828
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.
2929

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.springframework.kafka.listener.ContainerProperties.AckMode;
105105
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
106106
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
107+
import org.springframework.kafka.listener.adapter.HandlerMethodDetect;
107108
import org.springframework.kafka.support.Acknowledgment;
108109
import org.springframework.kafka.support.KafkaHeaders;
109110
import org.springframework.kafka.support.KafkaUtils;
@@ -160,6 +161,7 @@
160161
* @author Francois Rosiere
161162
* @author Daniel Gentes
162163
* @author Soby Chacko
164+
* @author Wang Zhiyang
163165
*/
164166
public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
165167
extends AbstractMessageListenerContainer<K, V> implements ConsumerPauseResumeEventPublisher {
@@ -660,6 +662,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
660662

661663
private final boolean wantsFullRecords;
662664

665+
private final boolean asyncReplies;
666+
663667
private final boolean autoCommit;
664668

665669
private final boolean isManualAck;
@@ -850,6 +854,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
850854
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType,
851855
ObservationRegistry observationRegistry) {
852856

857+
this.asyncReplies = listener instanceof HandlerMethodDetect hmd && hmd.isAsyncReplies()
858+
|| this.containerProperties.isAsyncAcks();
853859
AckMode ackMode = determineAckMode();
854860
this.isManualAck = ackMode.equals(AckMode.MANUAL);
855861
this.isCountAck = ackMode.equals(AckMode.COUNT)
@@ -860,12 +866,12 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
860866
this.isAnyManualAck = this.isManualAck || this.isManualImmediateAck;
861867
this.isRecordAck = ackMode.equals(AckMode.RECORD);
862868
this.offsetsInThisBatch =
863-
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
864-
? new HashMap<>()
869+
this.isAnyManualAck && this.asyncReplies
870+
? new ConcurrentHashMap<>()
865871
: null;
866872
this.deferredOffsets =
867-
this.isAnyManualAck && this.containerProperties.isAsyncAcks()
868-
? new HashMap<>()
873+
this.isAnyManualAck && this.asyncReplies
874+
? new ConcurrentHashMap<>()
869875
: null;
870876

871877
this.observationRegistry = observationRegistry;
@@ -904,8 +910,7 @@ else if (listener instanceof MessageListener) {
904910
else {
905911
throw new IllegalArgumentException("Listener must be one of 'MessageListener', "
906912
+ "'BatchMessageListener', or the variants that are consumer aware and/or "
907-
+ "Acknowledging"
908-
+ " not " + listener.getClass().getName());
913+
+ "Acknowledging not " + listener.getClass().getName());
909914
}
910915
this.listenerType = listenerType;
911916
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
@@ -928,18 +933,15 @@ else if (listener instanceof MessageListener) {
928933
this.logger.info(toString());
929934
}
930935
ApplicationContext applicationContext = getApplicationContext();
936+
ClassLoader classLoader = applicationContext == null
937+
? getClass().getClassLoader()
938+
: applicationContext.getClassLoader();
931939
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
932940
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
933-
consumerProperties, false,
934-
applicationContext == null
935-
? getClass().getClassLoader()
936-
: applicationContext.getClassLoader());
941+
consumerProperties, false, classLoader);
937942
this.checkNullValueForExceptions = this.containerProperties.isCheckDeserExWhenValueNull()
938943
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
939-
consumerProperties, true,
940-
applicationContext == null
941-
? getClass().getClassLoader()
942-
: applicationContext.getClassLoader());
944+
consumerProperties, true, classLoader);
943945
this.syncCommitTimeout = determineSyncCommitTimeout();
944946
if (this.containerProperties.getSyncCommitTimeout() == null) {
945947
// update the property, so we can use it directly from code elsewhere
@@ -965,6 +967,9 @@ private AckMode determineAckMode() {
965967
if (this.consumerGroupId == null && KafkaMessageListenerContainer.this.topicPartitions != null) {
966968
ackMode = AckMode.MANUAL;
967969
}
970+
if (this.asyncReplies && !(AckMode.MANUAL_IMMEDIATE.equals(ackMode) || AckMode.MANUAL.equals(ackMode))) {
971+
ackMode = AckMode.MANUAL;
972+
}
968973
return ackMode;
969974
}
970975

@@ -3389,15 +3394,15 @@ public void acknowledge() {
33893394
public void nack(Duration sleep) {
33903395
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
33913396
"nack() can only be called on the consumer thread");
3392-
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3393-
"nack() is not supported with out-of-order commits (asyncAcks=true)");
3397+
Assert.state(!ListenerConsumer.this.asyncReplies,
3398+
"nack() is not supported with out-of-order commits");
33943399
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
33953400
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
33963401
}
33973402

33983403
@Override
3399-
public boolean isAsyncAcks() {
3400-
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
3404+
public boolean isOutOfOrderCommit() {
3405+
return ListenerConsumer.this.asyncReplies;
34013406
}
34023407

34033408
@Override
@@ -3474,8 +3479,8 @@ public void acknowledge(int index) {
34743479
public void nack(int index, Duration sleep) {
34753480
Assert.state(Thread.currentThread().equals(ListenerConsumer.this.consumerThread),
34763481
"nack() can only be called on the consumer thread");
3477-
Assert.state(!ListenerConsumer.this.containerProperties.isAsyncAcks(),
3478-
"nack() is not supported with out-of-order commits (asyncAcks=true)");
3482+
Assert.state(!ListenerConsumer.this.asyncReplies,
3483+
"nack() is not supported with out-of-order commits");
34793484
Assert.isTrue(!sleep.isNegative(), "sleep cannot be negative");
34803485
Assert.isTrue(index >= 0 && index < this.records.count(), "index out of bounds");
34813486
ListenerConsumer.this.nackIndex = index;
@@ -3499,8 +3504,8 @@ public void nack(int index, Duration sleep) {
34993504
}
35003505

35013506
@Override
3502-
public boolean isAsyncAcks() {
3503-
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
3507+
public boolean isOutOfOrderCommit() {
3508+
return ListenerConsumer.this.asyncReplies;
35043509
}
35053510

35063511
@Override

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
1921
import org.apache.kafka.clients.consumer.ConsumerRecord;
2022
import org.apache.kafka.clients.producer.RecordMetadata;
2123
import org.apache.kafka.common.TopicPartition;
@@ -24,6 +26,9 @@
2426
import org.springframework.expression.common.TemplateParserContext;
2527
import org.springframework.kafka.support.KafkaHeaders;
2628
import org.springframework.lang.Nullable;
29+
import org.springframework.util.ClassUtils;
30+
31+
import reactor.core.publisher.Mono;
2732

2833
/**
2934
* Utilities for listener adapters.
@@ -40,6 +45,9 @@ public final class AdapterUtils {
4045
*/
4146
public static final ParserContext PARSER_CONTEXT = new TemplateParserContext("!{", "}");
4247

48+
private static final boolean MONO_PRESENT =
49+
ClassUtils.isPresent("reactor.core.publisher.Mono", AdapterUtils.class.getClassLoader());
50+
4351
private AdapterUtils() {
4452
}
4553

@@ -85,4 +93,16 @@ public static String getDefaultReplyTopicExpression() {
8593
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
8694
}
8795

96+
static boolean isAsyncReply(Class<?> resultType) {
97+
return isMono(resultType) || isCompletableFuture(resultType);
98+
}
99+
100+
static boolean isMono(Class<?> resultType) {
101+
return MONO_PRESENT && Mono.class.isAssignableFrom(resultType);
102+
}
103+
104+
static boolean isCompletableFuture(Class<?> resultType) {
105+
return CompletableFuture.class.isAssignableFrom(resultType);
106+
}
107+
88108
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
* unambiguous.
5656
*
5757
* @author Gary Russell
58+
* @author Wang Zhiyang
5859
*
5960
*/
6061
public class DelegatingInvocableHandler {
@@ -86,6 +87,8 @@ public class DelegatingInvocableHandler {
8687

8788
private final PayloadValidator validator;
8889

90+
private final boolean asyncReplies;
91+
8992
/**
9093
* Construct an instance with the supplied handlers for the bean.
9194
* @param handlers the handlers.
@@ -116,6 +119,15 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
116119
? configurableListableBeanFactory
117120
: null;
118121
this.validator = validator == null ? null : new PayloadValidator(validator);
122+
boolean asyncReplies = defaultHandler != null && isAsyncReply(defaultHandler);
123+
for (InvocableHandlerMethod handlerMethod : handlers) {
124+
asyncReplies |= isAsyncReply(handlerMethod);
125+
}
126+
this.asyncReplies = asyncReplies;
127+
}
128+
129+
private boolean isAsyncReply(InvocableHandlerMethod method) {
130+
return AdapterUtils.isAsyncReply(method.getMethod().getReturnType());
119131
}
120132

121133
private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
@@ -139,6 +151,15 @@ public Object getBean() {
139151
return this.bean;
140152
}
141153

154+
/**
155+
* Return true if any handler method has an async reply type.
156+
* @return the asyncReply.
157+
* @since 3.2
158+
*/
159+
public boolean isAsyncReplies() {
160+
return this.asyncReplies;
161+
}
162+
142163
/**
143164
* Invoke the method with the given message.
144165
* @param message the message.

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,16 @@ public class HandlerAdapter {
3333

3434
private final DelegatingInvocableHandler delegatingHandler;
3535

36+
private final boolean asyncReplies;
37+
3638
/**
3739
* Construct an instance with the provided method.
3840
* @param invokerHandlerMethod the method.
3941
*/
4042
public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4143
this.invokerHandlerMethod = invokerHandlerMethod;
4244
this.delegatingHandler = null;
45+
this.asyncReplies = AdapterUtils.isAsyncReply(invokerHandlerMethod.getMethod().getReturnType());
4346
}
4447

4548
/**
@@ -49,6 +52,16 @@ public HandlerAdapter(InvocableHandlerMethod invokerHandlerMethod) {
4952
public HandlerAdapter(DelegatingInvocableHandler delegatingHandler) {
5053
this.invokerHandlerMethod = null;
5154
this.delegatingHandler = delegatingHandler;
55+
this.asyncReplies = delegatingHandler.isAsyncReplies();
56+
}
57+
58+
/**
59+
* Return true if any handler method has an async reply type.
60+
* @return the asyncReply.
61+
* @since 3.2
62+
*/
63+
public boolean isAsyncReplies() {
64+
return this.asyncReplies;
5265
}
5366

5467
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener.adapter;
18+
19+
/**
20+
* Auto-detect {@link HandlerAdapter} args and return type.
21+
*
22+
* @author Wang zhiyang
23+
* @since 3.2
24+
*/
25+
public interface HandlerMethodDetect {
26+
27+
/**
28+
* Return true if this listener is request/reply and the replies are async.
29+
* @return true for async replies.
30+
* @since 3.2
31+
*/
32+
default boolean isAsyncReplies() {
33+
return false;
34+
}
35+
36+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@
9090
* @author Nathan Xu
9191
* @author Wang ZhiYang
9292
*/
93-
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware {
93+
public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerSeekAware, HandlerMethodDetect {
9494

9595
private static final SpelExpressionParser PARSER = new SpelExpressionParser();
9696

@@ -243,6 +243,10 @@ public void setHandlerMethod(HandlerAdapter handlerMethod) {
243243
this.handlerMethod = handlerMethod;
244244
}
245245

246+
public boolean isAsyncReplies() {
247+
return this.handlerMethod.isAsyncReplies();
248+
}
249+
246250
protected boolean isConsumerRecordList() {
247251
return this.isConsumerRecordList;
248252
}
@@ -469,7 +473,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
469473
messageReturnType = this.messageReturnType;
470474
}
471475
if (result instanceof CompletableFuture<?> completable) {
472-
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
476+
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
473477
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
474478
+ "otherwise the container will ack the message immediately");
475479
}
@@ -484,7 +488,7 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
484488
});
485489
}
486490
else if (monoPresent && result instanceof Mono<?> mono) {
487-
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
491+
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
488492
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
489493
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
490494
}

spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ default void nack(int index, Duration sleep) {
8181
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
8282
}
8383

84-
default boolean isAsyncAcks() {
84+
default boolean isOutOfOrderCommit() {
8585
return false;
8686
}
8787

0 commit comments

Comments
 (0)