Skip to content

Commit 4cd372b

Browse files
committed
* add javadoc in AdapterUtils
* move class from package `annotation` to package `adapter` * re name bar,baz in BatchMessagingMessageListenerAdapterTests * poblish unit test `MessagingMessageListenerAdapterTests` and `EnableKafkaKotlinCoroutinesTests` * poblish doc async-returns.adoc and nav.adoc
1 parent 2ea5282 commit 4cd372b

File tree

11 files changed

+67
-45
lines changed

11 files changed

+67
-45
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/nav.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
**** xref:kafka/receiving-messages/message-listeners.adoc[]
1212
**** xref:kafka/receiving-messages/message-listener-container.adoc[]
1313
**** xref:kafka/receiving-messages/ooo-commits.adoc[]
14+
**** xref:kafka/receiving-messages/async-returns.adoc[]
1415
**** xref:kafka/receiving-messages/listener-annotation.adoc[]
1516
**** xref:kafka/receiving-messages/listener-group-id.adoc[]
1617
**** xref:kafka/receiving-messages/container-thread-naming.adoc[]

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
[[async-returns]]
22
= Asynchronous `@KafkaListener` Return Types
33

4-
`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture<?>` and `Mono<?>`, letting the reply be sent asynchronously.
4+
`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types, letting the reply be sent asynchronously.
5+
return types include `CompletableFuture<?>`, `Mono<?>` and Kotlin `suspend` functions
56

67
[source, java]
78
----

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2023 the original author or authors.
2+
* Copyright 2014-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -87,6 +87,7 @@
8787
import org.springframework.kafka.listener.ContainerGroupSequencer;
8888
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
8989
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
90+
import org.springframework.kafka.listener.adapter.KafkaMessageHandlerMethodFactory;
9091
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
9192
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
9293
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2023 the original author or authors.
2+
* Copyright 2020-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
* Utilities for listener adapters.
3535
*
3636
* @author Gary Russell
37+
* @author Wang Zhiyang
3738
* @since 2.5
3839
*
3940
*/
@@ -93,15 +94,33 @@ public static String getDefaultReplyTopicExpression() {
9394
+ KafkaHeaders.REPLY_TOPIC + "']" + PARSER_CONTEXT.getExpressionSuffix();
9495
}
9596

96-
static boolean isAsyncReply(Class<?> resultType) {
97+
/**
98+
* Return the true when return types are asynchronous.
99+
* @param resultType {@code InvocableHandlerMethod} return type.
100+
* @return type is {@code Mono} or {@code CompletableFuture}.
101+
* @since 3.2
102+
*/
103+
public static boolean isAsyncReply(Class<?> resultType) {
97104
return isMono(resultType) || isCompletableFuture(resultType);
98105
}
99106

100-
static boolean isMono(Class<?> resultType) {
107+
/**
108+
* Return the true when type is {@code Mono}.
109+
* @param resultType {@code InvocableHandlerMethod} return type.
110+
* @return type is {@code Mono}.
111+
* @since 3.2
112+
*/
113+
public static boolean isMono(Class<?> resultType) {
101114
return MONO_PRESENT && Mono.class.isAssignableFrom(resultType);
102115
}
103116

104-
static boolean isCompletableFuture(Class<?> resultType) {
117+
/**
118+
* Return the true when type is {@code CompletableFuture}.
119+
* @param resultType {@code InvocableHandlerMethod} return type.
120+
* @return type is {@code CompletableFuture}.
121+
* @since 3.2
122+
*/
123+
public static boolean isCompletableFuture(Class<?> resultType) {
105124
return CompletableFuture.class.isAssignableFrom(resultType);
106125
}
107126

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.annotation;
17+
package org.springframework.kafka.listener.adapter;
1818

1919
/**
2020
* No-op resolver for method arguments of type {@link kotlin.coroutines.Continuation}.
@@ -25,7 +25,7 @@
2525
*
2626
* @author Wang Zhiyang
2727
*
28-
* @since 3.1
28+
* @since 3.2
2929
*
3030
* @see org.springframework.messaging.handler.annotation.reactive.ContinuationHandlerMethodArgumentResolver
3131
*/

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaMessageHandlerMethodFactory.java renamed to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaMessageHandlerMethodFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.annotation;
17+
package org.springframework.kafka.listener.adapter;
1818

1919
import java.lang.reflect.Method;
2020
import java.util.List;
@@ -32,7 +32,7 @@
3232
*
3333
* @author Wang Zhiyang
3434
*
35-
* @since 3.1
35+
* @since 3.2
3636
*/
3737
public class KafkaMessageHandlerMethodFactory extends DefaultMessageHandlerMethodFactory {
3838

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2023 the original author or authors.
2+
* Copyright 2021-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.annotation;
17+
package org.springframework.kafka.listener.adapter;
1818

1919
import java.util.List;
2020

spring-kafka/src/main/java/org/springframework/kafka/annotation/KotlinAwareInvocableHandlerMethod.java renamed to spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KotlinAwareInvocableHandlerMethod.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023 the original author or authors.
2+
* Copyright 2023-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package org.springframework.kafka.annotation;
17+
package org.springframework.kafka.listener.adapter;
1818

1919
import java.lang.reflect.Method;
2020

@@ -27,7 +27,7 @@
2727
*
2828
* @author Wang Zhiyang
2929
*
30-
* @since 3.1
30+
* @since 3.2
3131
*/
3232
public class KotlinAwareInvocableHandlerMethod extends InvocableHandlerMethod {
3333

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2023 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,6 +50,7 @@
5050

5151
/**
5252
* @author Gary Russell
53+
* @author Wang Zhiyang
5354
* @since 2.2.5
5455
*
5556
*/
@@ -71,35 +72,36 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr
7172

7273
@SuppressWarnings("unchecked")
7374
@Test
74-
public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Bar bar) {
75+
public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry,
76+
@Autowired BatchFuture batchFuture) {
7577

7678
BatchMessagingMessageListenerAdapter<String, String> adapter =
7779
spy((BatchMessagingMessageListenerAdapter<String, String>) registry
78-
.getListenerContainer("bar").getContainerProperties().getMessageListener());
80+
.getListenerContainer("batchFuture").getContainerProperties().getMessageListener());
7981
KafkaUtils.setConsumerGroupId("test.group.future");
8082
List<ConsumerRecord<String, String>> list = new ArrayList<>();
81-
list.add(new ConsumerRecord<>("bar", 0, 0L, null, "future_1"));
82-
list.add(new ConsumerRecord<>("bar", 0, 1L, null, "future_2"));
83-
list.add(new ConsumerRecord<>("bar", 1, 0L, null, "future_3"));
83+
list.add(new ConsumerRecord<>("batchFuture", 0, 0L, null, "future_1"));
84+
list.add(new ConsumerRecord<>("batchFuture", 0, 1L, null, "future_2"));
85+
list.add(new ConsumerRecord<>("batchFuture", 1, 0L, null, "future_3"));
8486
adapter.onMessage(list, null, null);
85-
assertThat(bar.group).isEqualTo("test.group.future");
87+
assertThat(batchFuture.group).isEqualTo("test.group.future");
8688
verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean());
8789
verify(adapter, times(1)).acknowledge(any());
8890
}
8991

9092
@SuppressWarnings("unchecked")
9193
@Test
92-
public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Baz baz) {
94+
public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired BatchMono batchMono) {
9395

9496
BatchMessagingMessageListenerAdapter<String, String> adapter =
9597
spy((BatchMessagingMessageListenerAdapter<String, String>) registry
96-
.getListenerContainer("baz").getContainerProperties().getMessageListener());
98+
.getListenerContainer("batchMono").getContainerProperties().getMessageListener());
9799
KafkaUtils.setConsumerGroupId("test.group.mono");
98100
List<ConsumerRecord<String, String>> list = new ArrayList<>();
99-
list.add(new ConsumerRecord<>("baz", 0, 0L, null, "mono_1"));
100-
list.add(new ConsumerRecord<>("baz", 0, 1L, null, "mono_2"));
101+
list.add(new ConsumerRecord<>("batchMono", 0, 0L, null, "mono_1"));
102+
list.add(new ConsumerRecord<>("batchMono", 0, 1L, null, "mono_2"));
101103
adapter.onMessage(list, null, null);
102-
assertThat(baz.group).isEqualTo("test.group.mono");
104+
assertThat(batchMono.group).isEqualTo("test.group.mono");
103105
verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean());
104106
verify(adapter, times(1)).acknowledge(any());
105107
}
@@ -118,11 +120,11 @@ public void listen(List<String> list, @Header(KafkaHeaders.GROUP_ID) String grou
118120

119121
}
120122

121-
public static class Bar {
123+
public static class BatchFuture {
122124

123125
public volatile String group;
124126

125-
@KafkaListener(id = "bar", topics = "bar", autoStartup = "false")
127+
@KafkaListener(id = "batchFuture", topics = "batchFuture", autoStartup = "false")
126128
public CompletableFuture<String> listen(List<String> list, @Header(KafkaHeaders.GROUP_ID) String groupId) {
127129

128130
this.group = groupId;
@@ -133,13 +135,13 @@ public CompletableFuture<String> listen(List<String> list, @Header(KafkaHeaders.
133135

134136
}
135137

136-
public static class Baz {
138+
public static class BatchMono {
137139

138140
public volatile String value = "someValue";
139141

140142
public volatile String group;
141143

142-
@KafkaListener(id = "baz", topics = "baz", autoStartup = "false")
144+
@KafkaListener(id = "batchMono", topics = "batchMono", autoStartup = "false")
143145
public Mono<Integer> listen(List<String> list, @Header(KafkaHeaders.GROUP_ID) String groupId) {
144146

145147
this.group = groupId;
@@ -158,13 +160,13 @@ public Foo foo() {
158160
}
159161

160162
@Bean
161-
public Bar bar() {
162-
return new Bar();
163+
public BatchFuture batchFuture() {
164+
return new BatchFuture();
163165
}
164166

165167
@Bean
166-
public Baz baz() {
167-
return new Baz();
168+
public BatchMono batchMono() {
169+
return new BatchMono();
168170
}
169171

170172
@SuppressWarnings({ "rawtypes" })

spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -124,9 +124,7 @@ public void test(Acknowledgment ack) {
124124

125125
public CompletableFuture<String> future(String data, Acknowledgment ack) {
126126

127-
CompletableFuture<String> future = new CompletableFuture<>();
128-
future.complete("processed" + data);
129-
return future;
127+
return CompletableFuture.completedFuture("processed" + data);
130128
}
131129

132130
public Mono<String> mono(String data, Acknowledgment ack) {

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit
5656
@SpringJUnitConfig
5757
@DirtiesContext
5858
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
59-
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"])
59+
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "kotlinReplyTopic1"])
6060
class EnableKafkaKotlinCoroutinesTests {
6161

6262
@Autowired
@@ -96,16 +96,16 @@ class EnableKafkaKotlinCoroutinesTests {
9696
@Test
9797
fun `test checkedKh reply`() {
9898
this.template.send("kotlinAsyncTestTopic3", "foo")
99-
val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30))
100-
assertThat(cr.value()).isEqualTo("FOO")
99+
val cr = this.template.receive("kotlinReplyTopic1", 0, 0, Duration.ofSeconds(30))
100+
assertThat(cr?.value() ?: "null").isEqualTo("FOO")
101101
}
102102

103103
@KafkaListener(id = "sendTopic", topics = ["kotlinAsyncTestTopic3"],
104104
containerFactory = "kafkaListenerContainerFactory")
105105
class Listener {
106106

107107
@KafkaHandler
108-
@SendTo("sendTopicReply1")
108+
@SendTo("kotlinReplyTopic1")
109109
suspend fun handler1(value: String) : String {
110110
return value.uppercase()
111111
}

0 commit comments

Comments
 (0)