Skip to content

Commit 2e59574

Browse files
Zhiyang.Wang1Wzy19930507
authored andcommitted
Refactor MessagingMessageListenerAdapter
* move `BatchMessagingMessageListenerAdapter#invoke` and `RecordMessagingMessageListenerAdapter#invoke` to `MessagingMessageListenerAdapter` * move `KafkaListenerErrorHandler` to `MessagingMessageListenerAdapter` * add `@Nullable` to `KafkaListenerErrorHandler`
1 parent b9c9abd commit 2e59574

File tree

4 files changed

+58
-73
lines changed

4 files changed

+58
-73
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaListenerErrorHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -72,6 +72,7 @@ default Object handleError(Message<?> message, ListenerExecutionFailedException
7272
* @return the return value is ignored unless the annotated method has a
7373
* {@code @SendTo} annotation.
7474
*/
75+
@Nullable
7576
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
7677
Consumer<?, ?> consumer, @Nullable Acknowledgment ack) {
7778

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,12 @@
2626

2727
import org.springframework.kafka.listener.BatchAcknowledgingConsumerAwareMessageListener;
2828
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
29-
import org.springframework.kafka.listener.ListenerExecutionFailedException;
3029
import org.springframework.kafka.support.Acknowledgment;
3130
import org.springframework.kafka.support.converter.BatchMessageConverter;
3231
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
3332
import org.springframework.kafka.support.converter.RecordMessageConverter;
3433
import org.springframework.lang.Nullable;
3534
import org.springframework.messaging.Message;
36-
import org.springframework.messaging.support.GenericMessage;
3735
import org.springframework.messaging.support.MessageBuilder;
3836
import org.springframework.util.Assert;
3937

@@ -56,15 +54,14 @@
5654
* @author Gary Russell
5755
* @author Artem Bilan
5856
* @author Venil Noronha
57+
* @author Wang ZhiYang
5958
* @since 1.1
6059
*/
6160
public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
6261
implements BatchAcknowledgingConsumerAwareMessageListener<K, V> {
6362

6463
private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
6564

66-
private final KafkaListenerErrorHandler errorHandler;
67-
6865
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
6966

7067
/**
@@ -85,8 +82,7 @@ public BatchMessagingMessageListenerAdapter(Object bean, Method method) {
8582
public BatchMessagingMessageListenerAdapter(Object bean, Method method,
8683
@Nullable KafkaListenerErrorHandler errorHandler) {
8784

88-
super(bean, method);
89-
this.errorHandler = errorHandler;
85+
super(bean, method, errorHandler);
9086
}
9187

9288
/**
@@ -172,39 +168,6 @@ public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgme
172168
invoke(records, acknowledgment, consumer, message);
173169
}
174170

175-
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
176-
final Message<?> messageArg) {
177-
178-
Message<?> message = messageArg;
179-
try {
180-
Object result = invokeHandler(records, acknowledgment, message, consumer);
181-
if (result != null) {
182-
handleResult(result, records, message);
183-
}
184-
}
185-
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
186-
if (this.errorHandler != null) {
187-
try {
188-
if (message.equals(NULL_MESSAGE)) {
189-
message = new GenericMessage<>(records);
190-
}
191-
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
192-
if (result != null) {
193-
handleResult(result, records, message);
194-
}
195-
}
196-
catch (Exception ex) {
197-
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
198-
"Listener error handler threw an exception for the incoming message",
199-
message.getPayload()), ex);
200-
}
201-
}
202-
else {
203-
throw e;
204-
}
205-
}
206-
}
207-
208171
@SuppressWarnings({ "unchecked", "rawtypes" })
209172
protected Message<?> toMessagingMessage(List records, @Nullable Acknowledgment acknowledgment,
210173
Consumer<?, ?> consumer) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.springframework.expression.spel.support.StandardTypeConverter;
4646
import org.springframework.kafka.core.KafkaTemplate;
4747
import org.springframework.kafka.listener.ConsumerSeekAware;
48+
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
4849
import org.springframework.kafka.listener.ListenerExecutionFailedException;
4950
import org.springframework.kafka.support.Acknowledgment;
5051
import org.springframework.kafka.support.KafkaHeaders;
@@ -102,6 +103,8 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
102103

103104
private final StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
104105

106+
private final KafkaListenerErrorHandler errorHandler;
107+
105108
private HandlerAdapter handlerMethod;
106109

107110
private boolean isConsumerRecordList;
@@ -143,8 +146,19 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
143146
* @param method the method.
144147
*/
145148
protected MessagingMessageListenerAdapter(Object bean, Method method) {
149+
this(bean, method, null);
150+
}
151+
152+
/**
153+
* Create an instance with the provided bean, method and kafka listener error handler.
154+
* @param bean the bean.
155+
* @param method the method.
156+
* @param errorHandler the kafka listener error handler.
157+
*/
158+
protected MessagingMessageListenerAdapter(Object bean, Method method, @Nullable KafkaListenerErrorHandler errorHandler) {
146159
this.bean = bean;
147160
this.inferredType = determineInferredType(method); // NOSONAR = intentionally not final
161+
this.errorHandler = errorHandler;
148162
}
149163

150164
/**
@@ -348,6 +362,20 @@ protected Message<?> toMessagingMessage(ConsumerRecord<K, V> cRecord, @Nullable
348362
return getMessageConverter().toMessage(cRecord, acknowledgment, consumer, getType());
349363
}
350364

365+
protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
366+
final Message<?> message) {
367+
368+
try {
369+
Object result = invokeHandler(records, acknowledgment, message, consumer);
370+
if (result != null) {
371+
handleResult(result, records, message);
372+
}
373+
}
374+
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
375+
handleException(records, acknowledgment, consumer, message, e);
376+
}
377+
}
378+
351379
/**
352380
* Invoke the handler, wrapping any exception to a {@link ListenerExecutionFailedException}
353381
* with a dedicated error message.
@@ -558,6 +586,30 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
558586
this.replyTemplate.send(builder.build());
559587
}
560588

589+
protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
590+
Message<?> message, ListenerExecutionFailedException e) {
591+
592+
if (this.errorHandler != null) {
593+
try {
594+
if (NULL_MESSAGE.equals(message)) {
595+
message = new GenericMessage<>(records);
596+
}
597+
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
598+
if (result != null) {
599+
handleResult(result, records, message);
600+
}
601+
}
602+
catch (Exception ex) {
603+
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
604+
"Listener error handler threw an exception for the incoming message",
605+
message.getPayload()), ex);
606+
}
607+
}
608+
else {
609+
throw e;
610+
}
611+
}
612+
561613
private void setCorrelation(MessageBuilder<?> builder, Message<?> source) {
562614
byte[] correlationBytes = getCorrelation(source);
563615
if (correlationBytes != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/RecordMessagingMessageListenerAdapter.java

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323

2424
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
2525
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
26-
import org.springframework.kafka.listener.ListenerExecutionFailedException;
2726
import org.springframework.kafka.support.Acknowledgment;
2827
import org.springframework.kafka.support.converter.ProjectingMessageConverter;
2928
import org.springframework.lang.Nullable;
3029
import org.springframework.messaging.Message;
31-
import org.springframework.messaging.support.GenericMessage;
3230

3331

3432
/**
@@ -53,17 +51,14 @@
5351
public class RecordMessagingMessageListenerAdapter<K, V> extends MessagingMessageListenerAdapter<K, V>
5452
implements AcknowledgingConsumerAwareMessageListener<K, V> {
5553

56-
private final KafkaListenerErrorHandler errorHandler;
57-
5854
public RecordMessagingMessageListenerAdapter(Object bean, Method method) {
5955
this(bean, method, null);
6056
}
6157

6258
public RecordMessagingMessageListenerAdapter(Object bean, Method method,
6359
@Nullable KafkaListenerErrorHandler errorHandler) {
6460

65-
super(bean, method);
66-
this.errorHandler = errorHandler;
61+
super(bean, method, errorHandler);
6762
}
6863

6964
/**
@@ -88,33 +83,7 @@ public void onMessage(ConsumerRecord<K, V> record, @Nullable Acknowledgment ackn
8883
if (logger.isDebugEnabled() && !(getMessageConverter() instanceof ProjectingMessageConverter)) {
8984
this.logger.debug("Processing [" + message + "]");
9085
}
91-
try {
92-
Object result = invokeHandler(record, acknowledgment, message, consumer);
93-
if (result != null) {
94-
handleResult(result, record, message);
95-
}
96-
}
97-
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
98-
if (this.errorHandler != null) {
99-
try {
100-
if (message.equals(NULL_MESSAGE)) {
101-
message = new GenericMessage<>(record);
102-
}
103-
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
104-
if (result != null) {
105-
handleResult(result, record, message);
106-
}
107-
}
108-
catch (Exception ex) {
109-
throw new ListenerExecutionFailedException(createMessagingErrorMessage(// NOSONAR stack trace loss
110-
"Listener error handler threw an exception for the incoming message",
111-
message.getPayload()), ex);
112-
}
113-
}
114-
else {
115-
throw e;
116-
}
117-
}
86+
invoke(record, acknowledgment, consumer, message);
11887
}
11988

12089
}

0 commit comments

Comments
 (0)