Skip to content

Commit c93e1a1

Browse files
Zhiyang.Wang1Wzy19930507
authored andcommitted
GH-1189: @SendTo for @KafkaHandler after error is handled
Sending the result from a `KafkaListenerErrorHandler` was broken for `@KafkaHandler` because the send to expression was lost.
1 parent 1c1cf45 commit c93e1a1

File tree

5 files changed

+48
-22
lines changed

5 files changed

+48
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -337,6 +337,17 @@ private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadC
337337
&& methodParameter.getParameterType().isAssignableFrom(payloadClass);
338338
}
339339

340+
@Nullable
341+
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
342+
343+
InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
344+
if (handler != null) {
345+
return new InvocationResult(result, this.handlerSendTo.get(handler),
346+
this.handlerReturnsMessage.get(handler));
347+
}
348+
return null;
349+
}
350+
340351
private static final class PayloadValidator extends PayloadMethodArgumentResolver {
341352

342353
PayloadValidator(Validator validator) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener.adapter;
1818

19+
import org.springframework.lang.Nullable;
1920
import org.springframework.messaging.Message;
2021
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
2122

@@ -98,4 +99,13 @@ public Object getBean() {
9899
}
99100
}
100101

102+
@Nullable
103+
public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) {
104+
105+
if (this.delegatingHandler != null && inboundPayload != null) {
106+
return this.delegatingHandler.getInvocationResultFor(result, inboundPayload);
107+
}
108+
return null;
109+
}
110+
101111
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.Iterator;
2727
import java.util.List;
2828
import java.util.Map;
29+
import java.util.Objects;
2930
import java.util.concurrent.CompletableFuture;
3031
import java.util.stream.Collectors;
3132

@@ -462,16 +463,14 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
462463
String replyTopic = evaluateReplyTopic(request, source, resultArg);
463464
Assert.state(replyTopic == null || this.replyTemplate != null,
464465
"a KafkaTemplate is required to support replies");
465-
Object result;
466-
boolean messageReturnType;
467-
if (resultArg instanceof InvocationResult invocationResult) {
468-
result = invocationResult.getResult();
469-
messageReturnType = invocationResult.isMessageReturnType();
470-
}
471-
else {
472-
result = resultArg;
473-
messageReturnType = this.messageReturnType;
474-
}
466+
467+
Object result = resultArg instanceof InvocationResult invocationResult ?
468+
invocationResult.getResult() :
469+
resultArg;
470+
boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ?
471+
invocationResult.isMessageReturnType() :
472+
this.messageReturnType;
473+
475474
if (result instanceof CompletableFuture<?> completable) {
476475
if (acknowledgment == null || !acknowledgment.isOutOfOrderCommit()) {
477476
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
@@ -677,9 +676,11 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle
677676
if (NULL_MESSAGE.equals(message)) {
678677
message = new GenericMessage<>(records);
679678
}
680-
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
681-
if (result != null) {
682-
handleResult(result, records, acknowledgment, consumer, message);
679+
Object errorResult = this.errorHandler.handleError(message, e, consumer, acknowledgment);
680+
if (errorResult != null && !(errorResult instanceof InvocationResult)) {
681+
Object result = this.handlerMethod.getInvocationResultFor(errorResult, message.getPayload());
682+
handleResult(Objects.requireNonNullElse(result, errorResult),
683+
records, acknowledgment, consumer, message);
683684
}
684685
}
685686
catch (Exception ex) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,10 +470,12 @@ public void testMulti() throws Exception {
470470
assertThat(this.multiListener.latch2.await(60, TimeUnit.SECONDS)).isTrue();
471471
ConsumerRecord<Integer, String> reply = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
472472
assertThat(reply.value()).isEqualTo("OK");
473-
consumer.close();
474473

475474
template.send("annotated8", 0, 1, "junk");
476475
assertThat(this.multiListener.errorLatch.await(60, TimeUnit.SECONDS)).isTrue();
476+
ConsumerRecord<Integer, String> reply2 = KafkaTestUtils.getSingleRecord(consumer, "annotated8reply");
477+
consumer.close();
478+
assertThat(reply2.value()).isEqualTo("JUNK intentional");
477479
assertThat(this.multiListener.meta).isNotNull();
478480
}
479481

@@ -1754,7 +1756,8 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) {
17541756
public KafkaListenerErrorHandler consumeMultiMethodException(MultiListenerBean listener) {
17551757
return (m, e) -> {
17561758
listener.errorLatch.countDown();
1757-
return null;
1759+
String payload = m.getPayload().toString().toUpperCase();
1760+
return payload + " " + e.getCause().getMessage();
17581761
};
17591762
}
17601763

@@ -2468,6 +2471,7 @@ static class MultiListenerBean {
24682471
volatile ConsumerRecordMetadata meta;
24692472

24702473
@KafkaHandler
2474+
@SendTo("annotated8reply")
24712475
public void bar(@NonNull String bar, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
24722476
if ("junk".equals(bar)) {
24732477
throw new RuntimeException("intentional");

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinCoroutinesTests.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,7 +56,7 @@ import java.util.concurrent.TimeUnit
5656
@SpringJUnitConfig
5757
@DirtiesContext
5858
@EmbeddedKafka(topics = ["kotlinAsyncTestTopic1", "kotlinAsyncTestTopic2",
59-
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2"])
59+
"kotlinAsyncBatchTestTopic1", "kotlinAsyncBatchTestTopic2", "sendTopicReply1"])
6060
class EnableKafkaKotlinCoroutinesTests {
6161

6262
@Autowired
@@ -96,7 +96,7 @@ class EnableKafkaKotlinCoroutinesTests {
9696
@Test
9797
fun `test checkedKh reply`() {
9898
this.template.send("kotlinAsyncTestTopic3", "foo")
99-
val cr = this.template.receive("sendTopic1", 0, 0, Duration.ofSeconds(30))
99+
val cr = this.template.receive("sendTopicReply1", 0, 0, Duration.ofSeconds(30))
100100
assertThat(cr.value()).isEqualTo("FOO")
101101
}
102102

@@ -105,7 +105,7 @@ class EnableKafkaKotlinCoroutinesTests {
105105
class Listener {
106106

107107
@KafkaHandler
108-
@SendTo("sendTopic1")
108+
@SendTo("sendTopicReply1")
109109
suspend fun handler1(value: String) : String {
110110
return value.uppercase()
111111
}

0 commit comments

Comments
 (0)