Skip to content

Commit 13ea4ea

Browse files
author
Zhiyang.Wang1
committed
GH-1189: support kotlin suspend
* Support kotlin suspend * Add kotlin suspend test, async request/reply with `@KafkaListener` `@KafkaHandler` and `@SendTo` * Fix async-returns.adoc and async warn log in `MessagingMessageListenerAdapter` * Add dependency `org.jetbrains.kotlinx:kotlinx-coroutines-reactor:1.7.3`
1 parent 72b0060 commit 13ea4ea

File tree

8 files changed

+429
-8
lines changed

8 files changed

+429
-8
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ ext {
6060
junit4Version = '4.13.2'
6161
junitJupiterVersion = '5.10.1'
6262
kafkaVersion = '3.6.1'
63+
kotlinCoroutinesVersion = '1.7.3'
6364
log4jVersion = '2.22.1'
6465
micrometerDocsVersion = '1.0.2'
6566
micrometerVersion = '1.13.0-SNAPSHOT'
@@ -279,6 +280,7 @@ project ('spring-kafka') {
279280
}
280281
api "org.apache.kafka:kafka-clients:$kafkaVersion"
281282
optionalApi "org.apache.kafka:kafka-streams:$kafkaVersion"
283+
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
282284
optionalApi 'com.fasterxml.jackson.core:jackson-core'
283285
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
284286
optionalApi 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ IMPORTANT: The listener container factory must be configured with manual ack mod
2727
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
2828
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.
2929

30-
If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure.
30+
If a `KafkaListenerErrorHandler` is configured on a listener with an async return type (including Kotlin suspend functions), the error handler is invoked after a failure.
3131
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
/**
20+
* No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}.
21+
* <p>
22+
* This class is similar to
23+
* {@link org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver}
24+
* but for regular {@link HandlerMethodArgumentResolver} contract.
25+
*
26+
* @author Wang Zhiyang
27+
*
28+
* @since 3.1
29+
*
30+
* @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver
31+
*/
32+
import org.springframework.core.MethodParameter;
33+
import org.springframework.messaging.Message;
34+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
35+
36+
import reactor.core.publisher.Mono;
37+
38+
public class ContinuationHandlerMethodArgumentResolver implements HandlerMethodArgumentResolver {
39+
40+
@Override
41+
public boolean supportsParameter(MethodParameter parameter) {
42+
return "kotlin.coroutines.Continuation".equals(parameter.getParameterType().getName());
43+
}
44+
45+
@Override
46+
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
47+
return Mono.empty();
48+
}
49+
50+
}

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1155,7 +1155,7 @@ private MessageHandlerMethodFactory getHandlerMethodFactory() {
11551155
}
11561156

11571157
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
1158-
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
1158+
DefaultMessageHandlerMethodFactory defaultFactory = new KafkaMessageHandlerMethodFactory();
11591159
Validator validator = KafkaListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
11601160
if (validator != null) {
11611161
defaultFactory.setValidator(validator);
@@ -1170,8 +1170,6 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
11701170

11711171
List<HandlerMethodArgumentResolver> customArgumentsResolver =
11721172
new ArrayList<>(KafkaListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
1173-
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
1174-
customArgumentsResolver.add(new KafkaNullAwarePayloadArgumentResolver(messageConverter, validator));
11751173
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);
11761174

11771175
defaultFactory.afterPropertiesSet();
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.lang.reflect.Method;
20+
import java.util.List;
21+
22+
import org.springframework.core.KotlinDetector;
23+
import org.springframework.messaging.converter.MessageConverter;
24+
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
25+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
26+
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite;
27+
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
28+
import org.springframework.validation.Validator;
29+
30+
/**
31+
* Extension of the {@link DefaultMessageHandlerMethodFactory} for Spring Kafka requirements.
32+
*
33+
* @author Wang Zhiyang
34+
*
35+
* @since 3.1
36+
*/
37+
public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory {
38+
39+
private final HandlerMethodArgumentResolverComposite argumentResolvers =
40+
new HandlerMethodArgumentResolverComposite();
41+
42+
private MessageConverter messageConverter;
43+
44+
private Validator validator;
45+
46+
@Override
47+
public void setMessageConverter(MessageConverter messageConverter) {
48+
super.setMessageConverter(messageConverter);
49+
this.messageConverter = messageConverter;
50+
}
51+
52+
@Override
53+
public void setValidator(Validator validator) {
54+
super.setValidator(validator);
55+
this.validator = validator;
56+
}
57+
58+
@Override
59+
protected List<HandlerMethodArgumentResolver> initArgumentResolvers() {
60+
List<HandlerMethodArgumentResolver> resolvers = super.initArgumentResolvers();
61+
if (KotlinDetector.isKotlinPresent()) {
62+
// Insert before PayloadMethodArgumentResolver
63+
resolvers.add(resolvers.size() - 1, new ContinuationHandlerMethodArgumentResolver());
64+
}
65+
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
66+
resolvers.add(resolvers.size() - 1, new KafkaNullAwarePayloadArgumentResolver(this.messageConverter, this.validator));
67+
this.argumentResolvers.addResolvers(resolvers);
68+
return resolvers;
69+
}
70+
71+
@Override
72+
public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
73+
InvocableHandlerMethod handlerMethod = new KotlinAwareInvocableHandlerMethod(bean, method);
74+
handlerMethod.setMessageMethodArgumentResolvers(this.argumentResolvers);
75+
return handlerMethod;
76+
}
77+
78+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.annotation;
18+
19+
import java.lang.reflect.Method;
20+
21+
import org.springframework.core.CoroutinesUtils;
22+
import org.springframework.core.KotlinDetector;
23+
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
24+
25+
/**
26+
* An {@link InvocableHandlerMethod} extension for supporting Kotlin {@code suspend} function.
27+
*
28+
* @author Wang Zhiyang
29+
*
30+
* @since 3.1
31+
*/
32+
public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod {
33+
34+
public KotlinAwareInvocableHandlerMethod(Object bean, Method method) {
35+
super(bean, method);
36+
}
37+
38+
@Override
39+
protected Object doInvoke(Object... args) throws Exception {
40+
Method method = getBridgedMethod();
41+
if (KotlinDetector.isSuspendingFunction(method)) {
42+
return CoroutinesUtils.invokeSuspendingFunction(method, getBean(), args);
43+
}
44+
else {
45+
return super.doInvoke(args);
46+
}
47+
}
48+
49+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.kafka.common.TopicPartition;
3737

3838
import org.springframework.context.expression.MapAccessor;
39+
import org.springframework.core.KotlinDetector;
3940
import org.springframework.core.MethodParameter;
4041
import org.springframework.core.log.LogAccessor;
4142
import org.springframework.expression.BeanResolver;
@@ -484,8 +485,8 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
484485
}
485486
else if (monoPresent && result instanceof Mono<?> mono) {
486487
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
487-
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type; " +
488-
"otherwise the container will ack the message immediately");
488+
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type " +
489+
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
489490
}
490491
mono.subscribe(
491492
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
@@ -660,7 +661,7 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
660661
}
661662
catch (Exception ex) {
662663
}
663-
this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source);
664+
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
664665
acknowledge(acknowledgment);
665666
}
666667

@@ -751,6 +752,8 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
751752
isNotConvertible |= isAck;
752753
boolean isConsumer = parameterIsType(parameterType, Consumer.class);
753754
isNotConvertible |= isConsumer;
755+
boolean isCoroutines = KotlinDetector.isKotlinType(methodParameter.getParameterType());
756+
isNotConvertible |= isCoroutines;
754757
boolean isMeta = parameterIsType(parameterType, ConsumerRecordMetadata.class);
755758
this.hasMetadataParameter |= isMeta;
756759
isNotConvertible |= isMeta;
@@ -769,7 +772,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
769772
break;
770773
}
771774
}
772-
else if (isAck || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
775+
else if (isAck || isCoroutines || isConsumer || annotationHeaderIsGroupId(methodParameter)) {
773776
allowedBatchParameters++;
774777
}
775778
}

0 commit comments

Comments
 (0)