Skip to content

Commit 76a2e0b

Browse files
committed
* polish DelegatingInvocableHandler and add javadoc
* polish `HandlerAdapter` * change `InvocationResult` to record * Optimization `MessagingMessageListenerAdapter.asyncFailure`
1 parent 2a3068e commit 76a2e0b

File tree

4 files changed

+18
-38
lines changed

4 files changed

+18
-38
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,15 @@ private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadC
337337
&& methodParameter.getParameterType().isAssignableFrom(payloadClass);
338338
}
339339

340+
/**
341+
* Return the result of a method invocation by providing a result and payload.
342+
* @param result the result.
343+
* @param inboundPayload the payload.
344+
* @return the result of a method invocation.
345+
* @since 3.2
346+
*/
340347
@Nullable
341348
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
342-
343349
InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
344350
if (handler != null) {
345351
return new InvocationResult(result, this.handlerSendTo.get(handler),

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ public Object getBean() {
101101

102102
@Nullable
103103
public InvocationResult getInvocationResultFor(Object result, @Nullable Object inboundPayload) {
104-
105104
if (this.delegatingHandler != null && inboundPayload != null) {
106105
return this.delegatingHandler.getInvocationResultFor(result, inboundPayload);
107106
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/InvocationResult.java

Lines changed: 4 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,39 +21,14 @@
2121

2222
/**
2323
* The result of a method invocation.
24+
* @param result the result.
25+
* @param messageReturnType the message return type.
26+
* @param sendTo the expression about sends topic.
2427
*
2528
* @author Gary Russell
2629
* @since 2.2
2730
*/
28-
public final class InvocationResult {
29-
30-
@Nullable
31-
private final Object result;
32-
33-
@Nullable
34-
private final Expression sendTo;
35-
36-
private final boolean messageReturnType;
37-
38-
public InvocationResult(@Nullable Object result, @Nullable Expression sendTo, boolean messageReturnType) {
39-
this.result = result;
40-
this.sendTo = sendTo;
41-
this.messageReturnType = messageReturnType;
42-
}
43-
44-
@Nullable
45-
public Object getResult() {
46-
return this.result;
47-
}
48-
49-
@Nullable
50-
public Expression getSendTo() {
51-
return this.sendTo;
52-
}
53-
54-
public boolean isMessageReturnType() {
55-
return this.messageReturnType;
56-
}
31+
public record InvocationResult(@Nullable Object result, @Nullable Expression sendTo, boolean messageReturnType) {
5732

5833
@Override
5934
public String toString() {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -465,10 +465,10 @@ protected void handleResult(Object resultArg, Object request, @Nullable Acknowle
465465
"a KafkaTemplate is required to support replies");
466466

467467
Object result = resultArg instanceof InvocationResult invocationResult ?
468-
invocationResult.getResult() :
468+
invocationResult.result() :
469469
resultArg;
470470
boolean messageReturnType = resultArg instanceof InvocationResult invocationResult ?
471-
invocationResult.isMessageReturnType() :
471+
invocationResult.messageReturnType() :
472472
this.messageReturnType;
473473

474474
if (result instanceof CompletableFuture<?> completable) {
@@ -506,7 +506,7 @@ else if (monoPresent && result instanceof Mono<?> mono) {
506506
private String evaluateReplyTopic(Object request, Object source, Object result) {
507507
String replyTo = null;
508508
if (result instanceof InvocationResult invResult) {
509-
replyTo = evaluateTopic(request, source, result, invResult.getSendTo());
509+
replyTo = evaluateTopic(request, source, result, invResult.sendTo());
510510
}
511511
else if (this.replyTopicExpression != null) {
512512
replyTo = evaluateTopic(request, source, result, this.replyTopicExpression);
@@ -656,16 +656,16 @@ protected void acknowledge(@Nullable Acknowledgment acknowledgment) {
656656

657657
protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
658658
Throwable t, Message<?> source) {
659+
659660
try {
660661
handleException(request, acknowledgment, consumer, source,
661662
new ListenerExecutionFailedException(createMessagingErrorMessage(
662663
"Async Fail", source.getPayload()), t));
663-
return;
664664
}
665-
catch (Exception ex) {
665+
catch (Throwable ex) {
666+
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
667+
acknowledge(acknowledgment);
666668
}
667-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
668-
acknowledge(acknowledgment);
669669
}
670670

671671
protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,

0 commit comments

Comments
 (0)