Skip to content

Commit ef719b5

Browse files
authored
GH-2976: Cleanup in KafkaListener infra classes
Fixes: #2976 Minor improvement in `DelegatingInvocableHandler` and AKLE.
1 parent f245b59 commit ef719b5

File tree

5 files changed

+37
-40
lines changed

5 files changed

+37
-40
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerEndpoint.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -123,9 +123,9 @@ public abstract class AbstractKafkaListenerEndpoint<K, V>
123123
@Override
124124
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
125125
this.beanFactory = beanFactory;
126-
if (beanFactory instanceof ConfigurableListableBeanFactory) {
127-
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
128-
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory, null);
126+
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
127+
this.resolver = configurableListableBeanFactory.getBeanExpressionResolver();
128+
this.expressionContext = new BeanExpressionContext(configurableListableBeanFactory, null);
129129
}
130130
this.beanResolver = new BeanFactoryResolver(beanFactory);
131131
}
@@ -275,7 +275,7 @@ public void setGroup(String group) {
275275
* @since 1.1
276276
*/
277277
public boolean isBatchListener() {
278-
return this.batchListener == null ? false : this.batchListener;
278+
return this.batchListener != null && this.batchListener;
279279
}
280280

281281
/**
@@ -530,11 +530,10 @@ private void setupMessageListener(MessageListenerContainer container,
530530
.acceptIfNotNull(this.correlationHeaderName, adapter::setCorrelationHeaderName);
531531
adapter.setSplitIterables(this.splitIterables);
532532
Object messageListener = adapter;
533-
boolean isBatchListener = isBatchListener();
534533
Assert.state(messageListener != null,
535534
() -> "Endpoint [" + this + "] must provide a non null message listener");
536535
if (this.recordFilterStrategy != null) {
537-
if (isBatchListener) {
536+
if (isBatchListener()) {
538537
if (((MessagingMessageListenerAdapter<K, V>) messageListener).isConsumerRecords()) {
539538
this.logger.warn(() -> "Filter strategy ignored when consuming 'ConsumerRecords' instead of a List"
540539
+ (this.id != null ? " id: " + this.id : ""));

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -207,22 +207,22 @@ protected MessagingMessageListenerAdapter<K, V> createMessageListenerInstance(
207207

208208
MessagingMessageListenerAdapter<K, V> listener;
209209
if (isBatchListener()) {
210-
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<K, V>(
210+
BatchMessagingMessageListenerAdapter<K, V> messageListener = new BatchMessagingMessageListenerAdapter<>(
211211
this.bean, this.method, this.errorHandler);
212212
BatchToRecordAdapter<K, V> batchToRecordAdapter = getBatchToRecordAdapter();
213213
if (batchToRecordAdapter != null) {
214214
messageListener.setBatchToRecordAdapter(batchToRecordAdapter);
215215
}
216-
if (messageConverter instanceof BatchMessageConverter) {
217-
messageListener.setBatchMessageConverter((BatchMessageConverter) messageConverter);
216+
if (messageConverter instanceof BatchMessageConverter batchMessageConverter) {
217+
messageListener.setBatchMessageConverter(batchMessageConverter);
218218
}
219219
listener = messageListener;
220220
}
221221
else {
222-
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<K, V>(
222+
RecordMessagingMessageListenerAdapter<K, V> messageListener = new RecordMessagingMessageListenerAdapter<>(
223223
this.bean, this.method, this.errorHandler);
224-
if (messageConverter instanceof RecordMessageConverter) {
225-
messageListener.setMessageConverter((RecordMessageConverter) messageConverter);
224+
if (messageConverter instanceof RecordMessageConverter recordMessageConverter) {
225+
messageListener.setMessageConverter(recordMessageConverter);
226226
}
227227
listener = messageListener;
228228
}

spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -71,7 +71,7 @@ public void setValidator(Validator validator) {
7171

7272
@Override
7373
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
74-
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
74+
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<>();
7575
InvocableHandlerMethod defaultHandler = null;
7676
for (Method method : this.methods) {
7777
InvocableHandlerMethod handler = getMessageHandlerMethodFactory()

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AdapterUtils.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -67,10 +67,9 @@ public static Object buildConsumerRecordMetadataFromArray(Object... data) {
6767
*/
6868
@Nullable
6969
public static ConsumerRecordMetadata buildConsumerRecordMetadata(Object data) {
70-
if (!(data instanceof ConsumerRecord)) {
70+
if (!(data instanceof ConsumerRecord<?, ?> record)) {
7171
return null;
7272
}
73-
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) data;
7473
return new ConsumerRecordMetadata(new RecordMetadata(new TopicPartition(record.topic(), record.partition()),
7574
record.offset(), 0, record.timestamp(), record.serializedKeySize(),
7675
record.serializedValueSize()), record.timestampType());

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
112112
this.bean = bean;
113113
this.resolver = beanExpressionResolver;
114114
this.beanExpressionContext = beanExpressionContext;
115-
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory
116-
? (ConfigurableListableBeanFactory) beanFactory
115+
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory
116+
? configurableListableBeanFactory
117117
: null;
118118
this.validator = validator == null ? null : new PayloadValidator(validator);
119119
}
@@ -124,7 +124,7 @@ private void checkSpecial(@Nullable InvocableHandlerMethod handler) {
124124
}
125125
Parameter[] parameters = handler.getMethod().getParameters();
126126
for (Parameter parameter : parameters) {
127-
if (parameter.getType().equals(ConsumerRecordMetadata.class)) {
127+
if (ConsumerRecordMetadata.class.equals(parameter.getType())) {
128128
this.handlerMetadataAware.put(handler, true);
129129
return;
130130
}
@@ -148,7 +148,7 @@ public Object getBean() {
148148
* or the method raised an exception.
149149
*/
150150
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
151-
Class<? extends Object> payloadClass = message.getPayload().getClass();
151+
Class<?> payloadClass = message.getPayload().getClass();
152152
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
153153
if (this.validator != null && this.defaultHandler != null) {
154154
MethodParameter parameter = this.payloadMethodParameters.get(handler);
@@ -175,7 +175,7 @@ public Object invoke(Message<?> message, Object... providedArgs) throws Exceptio
175175
* @param payloadClass the payload class.
176176
* @return the handler.
177177
*/
178-
protected InvocableHandlerMethod getHandlerForPayload(Class<? extends Object> payloadClass) {
178+
protected InvocableHandlerMethod getHandlerForPayload(Class<?> payloadClass) {
179179
InvocableHandlerMethod handler = this.cachedHandlers.get(payloadClass);
180180
if (handler == null) {
181181
handler = findHandlerForPayload(payloadClass);
@@ -246,36 +246,32 @@ protected InvocableHandlerMethod findHandlerForPayload(Class<? extends Object> p
246246
InvocableHandlerMethod result = null;
247247
for (InvocableHandlerMethod handler : this.handlers) {
248248
if (matchHandlerMethod(payloadClass, handler)) {
249-
if (result != null) {
250-
boolean resultIsDefault = result.equals(this.defaultHandler);
251-
if (!handler.equals(this.defaultHandler) && !resultIsDefault) {
249+
if (result != null && !result.equals(this.defaultHandler)) {
250+
if (!handler.equals(this.defaultHandler)) {
252251
throw new KafkaException("Ambiguous methods for payload type: " + payloadClass + ": " +
253252
result.getMethod().getName() + " and " + handler.getMethod().getName());
254253
}
255-
if (!resultIsDefault) {
256-
continue; // otherwise replace the result with the actual match
257-
}
254+
continue; // otherwise replace the result with the actual match
258255
}
259256
result = handler;
260257
}
261258
}
262259
return result != null ? result : this.defaultHandler;
263260
}
264261

265-
protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, InvocableHandlerMethod handler) {
262+
protected boolean matchHandlerMethod(Class<?> payloadClass, InvocableHandlerMethod handler) {
266263
Method method = handler.getMethod();
267264
Annotation[][] parameterAnnotations = method.getParameterAnnotations();
268265
// Single param; no annotation or not @Header
269266
if (parameterAnnotations.length == 1) {
270267
MethodParameter methodParameter = new MethodParameter(method, 0);
271-
if ((methodParameter.getParameterAnnotations().length == 0
272-
|| !methodParameter.hasParameterAnnotation(Header.class))
273-
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
268+
boolean isPayload = assignPayload(methodParameter, payloadClass);
269+
if (isPayload) {
274270
if (this.validator != null) {
275271
this.payloadMethodParameters.put(handler, methodParameter);
276272
}
277-
return true;
278273
}
274+
return isPayload;
279275
}
280276

281277
MethodParameter foundCandidate = findCandidate(payloadClass, method, parameterAnnotations);
@@ -285,14 +281,12 @@ protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, Invoc
285281
return foundCandidate != null;
286282
}
287283

288-
private MethodParameter findCandidate(Class<? extends Object> payloadClass, Method method,
289-
Annotation[][] parameterAnnotations) {
284+
@Nullable
285+
private MethodParameter findCandidate(Class<?> payloadClass, Method method, Annotation[][] parameterAnnotations) {
290286
MethodParameter foundCandidate = null;
291287
for (int i = 0; i < parameterAnnotations.length; i++) {
292288
MethodParameter methodParameter = new MethodParameter(method, i);
293-
if ((methodParameter.getParameterAnnotations().length == 0
294-
|| !methodParameter.hasParameterAnnotation(Header.class))
295-
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
289+
if (assignPayload(methodParameter, payloadClass)) {
296290
if (foundCandidate != null) {
297291
throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString());
298292
}
@@ -316,15 +310,20 @@ public boolean hasDefaultHandler() {
316310
return this.defaultHandler != null;
317311
}
318312

313+
private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadClass) {
314+
return (methodParameter.getParameterAnnotations().length == 0
315+
|| !methodParameter.hasParameterAnnotation(Header.class))
316+
&& methodParameter.getParameterType().isAssignableFrom(payloadClass);
317+
}
318+
319319
private static final class PayloadValidator extends PayloadMethodArgumentResolver {
320320

321321
PayloadValidator(Validator validator) {
322322
super(new MessageConverter() { // Required but never used
323323

324324
@Override
325325
@Nullable
326-
public Message<?> toMessage(Object payload, @Nullable
327-
MessageHeaders headers) {
326+
public Message<?> toMessage(Object payload, @Nullable MessageHeaders headers) {
328327
return null;
329328
}
330329

0 commit comments

Comments
 (0)