Skip to content

Commit 3d1e450

Browse files
committed
Add nullability changes in listener/adapter package
#3762 Signed-off-by: Soby Chacko <[email protected]>
1 parent efd3484 commit 3d1e450

18 files changed

+100
-85
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericMessageListener.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
5656
* @param consumer the consumer.
5757
* @since 2.0
5858
*/
59-
default void onMessage(T data, Consumer<?, ?> consumer) {
59+
default void onMessage(T data, @Nullable Consumer<?, ?> consumer) {
6060
throw new UnsupportedOperationException("Container should never call this");
6161
}
6262

@@ -68,7 +68,7 @@ default void onMessage(T data, Consumer<?, ?> consumer) {
6868
* @param consumer the consumer.
6969
* @since 2.0
7070
*/
71-
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
71+
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
7272
throw new UnsupportedOperationException("Container should never call this");
7373
}
7474

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.consumer.Consumer;
2020
import org.apache.kafka.common.TopicPartition;
21+
import org.jspecify.annotations.Nullable;
2122

2223
/**
2324
* Interface for backing off a {@link MessageListenerContainer}
@@ -32,7 +33,7 @@ public interface KafkaConsumerBackoffManager {
3233
void backOffIfNecessary(Context context);
3334

3435
default Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition,
35-
Consumer<?, ?> messageConsumer) {
36+
@Nullable Consumer<?, ?> messageConsumer) {
3637
return new Context(dueTimestamp, topicPartition, listenerId, messageConsumer);
3738
}
3839

@@ -64,7 +65,7 @@ class Context {
6465
private final Consumer<?, ?> consumerForTimingAdjustment;
6566

6667
Context(long dueTimestamp, TopicPartition topicPartition, String listenerId,
67-
Consumer<?, ?> consumerForTimingAdjustment) {
68+
@Nullable Consumer<?, ?> consumerForTimingAdjustment) {
6869

6970
this.dueTimestamp = dueTimestamp;
7071
this.listenerId = listenerId;
@@ -84,7 +85,7 @@ public TopicPartition getTopicPartition() {
8485
return this.topicPartition;
8586
}
8687

87-
public Consumer<?, ?> getConsumerForTimingAdjustment() {
88+
public @Nullable Consumer<?, ?> getConsumerForTimingAdjustment() {
8889
return this.consumerForTimingAdjustment;
8990
}
9091

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractDelegatingMessageListenerAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121

2222
import org.apache.commons.logging.LogFactory;
2323
import org.apache.kafka.common.TopicPartition;
24+
import org.jspecify.annotations.Nullable;
2425

2526
import org.springframework.core.log.LogAccessor;
2627
import org.springframework.kafka.listener.ConsumerSeekAware;
@@ -46,7 +47,7 @@ public abstract class AbstractDelegatingMessageListenerAdapter<T>
4647

4748
protected final ListenerType delegateType; // NOSONAR
4849

49-
private final ConsumerSeekAware seekAware;
50+
private final @Nullable ConsumerSeekAware seekAware;
5051

5152
public AbstractDelegatingMessageListenerAdapter(T delegate) {
5253
this.delegate = delegate;

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractRetryingMessageListenerAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public abstract class AbstractRetryingMessageListenerAdapter<K, V, T>
3737

3838
private final RetryTemplate retryTemplate;
3939

40-
private final RecoveryCallback<? extends Object> recoveryCallback;
40+
private final @Nullable RecoveryCallback<? extends Object> recoveryCallback;
4141

4242
/**
4343
* Construct an instance with the supplied retry template. The exception will be
@@ -69,7 +69,7 @@ public RetryTemplate getRetryTemplate() {
6969
return this.retryTemplate;
7070
}
7171

72-
public RecoveryCallback<? extends Object> getRecoveryCallback() {
72+
public @Nullable RecoveryCallback<? extends Object> getRecoveryCallback() {
7373
return this.recoveryCallback;
7474
}
7575

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public class BatchMessagingMessageListenerAdapter<K, V> extends MessagingMessage
6262

6363
private BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
6464

65-
private BatchToRecordAdapter<K, V> batchToRecordAdapter;
65+
private @Nullable BatchToRecordAdapter<K, V> batchToRecordAdapter;
6666

6767
/**
6868
* Create an instance with the provided parameters.

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchToRecordAdapter.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.jspecify.annotations.Nullable;
2324

2425
import org.springframework.kafka.support.Acknowledgment;
2526
import org.springframework.messaging.Message;
@@ -47,7 +48,7 @@ public interface BatchToRecordAdapter<K, V> {
4748
* @param consumer the consumer.
4849
* @param callback the callback.
4950
*/
50-
void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, Acknowledgment ack,
51+
void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment ack,
5152
Consumer<?, ?> consumer, Callback<K, V> callback);
5253

5354
/**
@@ -66,7 +67,7 @@ interface Callback<K, V> {
6667
* @param consumer the consumer.
6768
* @param message the message.
6869
*/
69-
void invoke(ConsumerRecord<K, V> record, Acknowledgment ack, Consumer<?, ?> consumer,
70+
void invoke(ConsumerRecord<K, V> record, @Nullable Acknowledgment ack, Consumer<?, ?> consumer,
7071
Message<?> message);
7172

7273
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/ConvertingMessageListener.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2022 the original author or authors.
2+
* Copyright 2016-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121

2222
import org.apache.kafka.clients.consumer.Consumer;
2323
import org.apache.kafka.clients.consumer.ConsumerRecord;
24+
import org.jspecify.annotations.Nullable;
2425

2526
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
2627
import org.springframework.kafka.listener.AcknowledgingMessageListener;
@@ -60,7 +61,7 @@ public class ConvertingMessageListener<V> implements DelegatingMessageListener<M
6061

6162
private MessageConverter messageConverter;
6263

63-
private KafkaHeaderMapper headerMapper;
64+
private @Nullable KafkaHeaderMapper headerMapper;
6465

6566
/**
6667
* Construct an instance with the provided {@link MessageListener} and {@link Class}
@@ -106,7 +107,7 @@ public MessageListener getDelegate() {
106107

107108
@Override
108109
@SuppressWarnings("unchecked")
109-
public void onMessage(ConsumerRecord receivedRecord, Acknowledgment acknowledgment, Consumer consumer) {
110+
public void onMessage(ConsumerRecord receivedRecord, @Nullable Acknowledgment acknowledgment, Consumer consumer) {
110111
ConsumerRecord convertedConsumerRecord = convertConsumerRecord(receivedRecord);
111112
if (this.delegate instanceof AcknowledgingConsumerAwareMessageListener) {
112113
this.delegate.onMessage(convertedConsumerRecord, acknowledgment, consumer);

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DefaultBatchToRecordAdapter.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020

2121
import org.apache.kafka.clients.consumer.Consumer;
2222
import org.apache.kafka.clients.consumer.ConsumerRecord;
23+
import org.jspecify.annotations.Nullable;
2324

2425
import org.springframework.core.log.LogAccessor;
2526
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
@@ -62,7 +63,7 @@ public DefaultBatchToRecordAdapter(ConsumerRecordRecoverer recoverer) {
6263
}
6364

6465
@Override
65-
public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, Acknowledgment ack,
66+
public void adapt(List<Message<?>> messages, List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment ack,
6667
Consumer<?, ?> consumer, Callback<K, V> callback) {
6768

6869
for (int i = 0; i < messages.size(); i++) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public class DelegatingInvocableHandler {
6969
private final ConcurrentMap<InvocableHandlerMethod, MethodParameter> payloadMethodParameters =
7070
new ConcurrentHashMap<>();
7171

72-
private final InvocableHandlerMethod defaultHandler;
72+
private final @Nullable InvocableHandlerMethod defaultHandler;
7373

7474
private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new ConcurrentHashMap<>();
7575

@@ -79,13 +79,13 @@ public class DelegatingInvocableHandler {
7979

8080
private final Object bean;
8181

82-
private final BeanExpressionResolver resolver;
82+
private final @Nullable BeanExpressionResolver resolver;
8383

84-
private final BeanExpressionContext beanExpressionContext;
84+
private final @Nullable BeanExpressionContext beanExpressionContext;
8585

86-
private final ConfigurableListableBeanFactory beanFactory;
86+
private final @Nullable ConfigurableListableBeanFactory beanFactory;
8787

88-
private final PayloadValidator validator;
88+
private final @Nullable PayloadValidator validator;
8989

9090
private final boolean asyncReplies;
9191

@@ -168,7 +168,8 @@ public boolean isAsyncReplies() {
168168
* @throws Exception raised if no suitable argument resolver can be found,
169169
* or the method raised an exception.
170170
*/
171-
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
171+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
172+
public Object invoke(Message<?> message, @Nullable Object... providedArgs) throws Exception { //NOSONAR
172173
Class<?> payloadClass = message.getPayload().getClass();
173174
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
174175
if (this.validator != null && this.defaultHandler != null) {
@@ -345,6 +346,7 @@ private boolean assignPayload(MethodParameter methodParameter, Class<?> payloadC
345346
* @since 3.2
346347
*/
347348
@Nullable
349+
@SuppressWarnings("NullAway") // Dataflow analysis limitation
348350
public InvocationResult getInvocationResultFor(Object result, Object inboundPayload) {
349351
InvocableHandlerMethod handler = findHandlerForPayload(inboundPayload.getClass());
350352
if (handler != null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringBatchMessageListenerAdapter.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public FilteringBatchMessageListenerAdapter(BatchMessageListener<K, V> delegate,
7878

7979
@Override
8080
public void onMessage(List<ConsumerRecord<K, V>> records, @Nullable Acknowledgment acknowledgment,
81-
Consumer<?, ?> consumer) {
81+
@Nullable Consumer<?, ?> consumer) {
8282

8383
final RecordFilterStrategy<K, V> recordFilterStrategy = getRecordFilterStrategy();
8484
final List<ConsumerRecord<K, V>> consumerRecords = recordFilterStrategy.filterBatch(records);
@@ -100,8 +100,8 @@ else if (!consumerRecords.isEmpty() || this.consumerAware
100100
}
101101
}
102102

103-
private void invokeDelegate(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment,
104-
Consumer<?, ?> consumer) {
103+
private void invokeDelegate(List<ConsumerRecord<K, V>> consumerRecords, @Nullable Acknowledgment acknowledgment,
104+
@Nullable Consumer<?, ?> consumer) {
105105

106106
switch (this.delegateType) {
107107
case ACKNOWLEDGING_CONSUMER_AWARE:
@@ -129,12 +129,12 @@ public void onMessage(List<ConsumerRecord<K, V>> data) {
129129
}
130130

131131
@Override
132-
public void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment) {
132+
public void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Acknowledgment acknowledgment) {
133133
onMessage(data, acknowledgment, null); // NOSONAR
134134
}
135135

136136
@Override
137-
public void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer) {
137+
public void onMessage(List<ConsumerRecord<K, V>> data, @Nullable Consumer<?, ?> consumer) {
138138
onMessage(data, null, consumer);
139139
}
140140

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/FilteringMessageListenerAdapter.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public FilteringMessageListenerAdapter(MessageListener<K, V> delegate,
6666

6767
@Override
6868
public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment,
69-
Consumer<?, ?> consumer) {
69+
@Nullable Consumer<?, ?> consumer) {
7070

7171
if (!filter(consumerRecord)) {
7272
switch (this.delegateType) {
@@ -104,12 +104,12 @@ public void onMessage(ConsumerRecord<K, V> data) {
104104
}
105105

106106
@Override
107-
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
107+
public void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment) {
108108
onMessage(data, acknowledgment, null); // NOSONAR
109109
}
110110

111111
@Override
112-
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
112+
public void onMessage(ConsumerRecord<K, V> data, @Nullable Consumer<?, ?> consumer) {
113113
onMessage(data, null, consumer);
114114
}
115115

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.kafka.listener.adapter;
1818

1919
import java.lang.reflect.Method;
20+
import java.util.Objects;
2021

2122
import org.jspecify.annotations.Nullable;
2223

@@ -35,9 +36,9 @@
3536
*/
3637
public class HandlerAdapter {
3738

38-
private final InvocableHandlerMethod invokerHandlerMethod;
39+
private final @Nullable InvocableHandlerMethod invokerHandlerMethod;
3940

40-
private final DelegatingInvocableHandler delegatingHandler;
41+
private final @Nullable DelegatingInvocableHandler delegatingHandler;
4142

4243
private final boolean asyncReplies;
4344

@@ -74,11 +75,11 @@ public boolean isAsyncReplies() {
7475
}
7576

7677
@Nullable
77-
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
78+
public Object invoke(Message<?> message, @Nullable Object... providedArgs) throws Exception { //NOSONAR
7879
if (this.invokerHandlerMethod != null) {
7980
return this.invokerHandlerMethod.invoke(message, providedArgs); // NOSONAR
8081
}
81-
else if (this.delegatingHandler.hasDefaultHandler()) {
82+
else if (Objects.requireNonNull(this.delegatingHandler).hasDefaultHandler()) {
8283
// Needed to avoid returning raw Message which matches Object
8384
Object[] args = new Object[providedArgs.length + 1];
8485
args[0] = message.getPayload();
@@ -95,7 +96,7 @@ public String getMethodAsString(Object payload) {
9596
return this.invokerHandlerMethod.getMethod().toGenericString();
9697
}
9798
else {
98-
return this.delegatingHandler.getMethodNameFor(payload);
99+
return Objects.requireNonNull(this.delegatingHandler).getMethodNameFor(payload);
99100
}
100101
}
101102

@@ -104,7 +105,7 @@ public Object getBean() {
104105
return this.invokerHandlerMethod.getBean();
105106
}
106107
else {
107-
return this.delegatingHandler.getBean();
108+
return Objects.requireNonNull(this.delegatingHandler).getBean();
108109
}
109110
}
110111

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/KafkaBackoffAwareMessageListenerAdapter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgm
101101
}
102102
}
103103

104-
private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
104+
private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, @Nullable Acknowledgment acknowledgment, @Nullable Consumer<?, ?> consumer) {
105105
switch (this.delegateType) {
106106
case ACKNOWLEDGING_CONSUMER_AWARE -> this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
107107
case ACKNOWLEDGING -> this.delegate.onMessage(consumerRecord, acknowledgment);
@@ -111,7 +111,7 @@ private void invokeDelegateOnMessage(ConsumerRecord<K, V> consumerRecord, Acknow
111111
}
112112

113113
private KafkaConsumerBackoffManager.Context createContext(ConsumerRecord<K, V> data, long nextExecutionTimestamp,
114-
Consumer<?, ?> consumer) {
114+
@Nullable Consumer<?, ?> consumer) {
115115

116116
return this.kafkaConsumerBackoffManager.createContext(nextExecutionTimestamp, this.listenerId,
117117
new TopicPartition(data.topic(), data.partition()), consumer);
@@ -135,12 +135,12 @@ public void onMessage(ConsumerRecord<K, V> data) {
135135
}
136136

137137
@Override
138-
public void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment) {
138+
public void onMessage(ConsumerRecord<K, V> data, @Nullable Acknowledgment acknowledgment) {
139139
onMessage(data, acknowledgment, null); // NOSONAR
140140
}
141141

142142
@Override
143-
public void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer) {
143+
public void onMessage(ConsumerRecord<K, V> data, @Nullable Consumer<?, ?> consumer) {
144144
onMessage(data, null, consumer);
145145
}
146146
}

0 commit comments

Comments
 (0)