Skip to content

Commit 474651d

Browse files
Add NonNullApi into 'annotation' package
1 parent ef719b5 commit 474651d

File tree

8 files changed

+66
-24
lines changed

8 files changed

+66
-24
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/DltHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828

2929
/**
3030
* Annotation to determine the method that should process the DLT topic message.
31-
* The method can have the same parameters as a {@link KafkaListener} method can (Message, Acknowledgement, etc).
31+
* The method can have the same parameters as a {@link KafkaListener} method can have (Message, Acknowledgement, etc).
3232
*
33+
* <p>
3334
* The annotated method must be in the same class as the corresponding {@link KafkaListener} annotation.
3435
*
3536
* @author Tomaz Fernandes

spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableKafkaRetryTopic.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -55,6 +55,7 @@
5555
* bean. This annotation is meta-annotated with {@code @EnableKafka} so it is not
5656
* necessary to specify both.
5757
*
58+
* <p>
5859
* To configure the feature's components, extend the
5960
* {@link RetryTopicConfigurationSupport} class and override the appropriate methods on a
6061
* {@link Configuration @Configuration} class, such as:

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaNullAwarePayloadArgumentResolver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.springframework.core.MethodParameter;
2222
import org.springframework.kafka.support.KafkaNull;
23+
import org.springframework.lang.Nullable;
2324
import org.springframework.messaging.Message;
2425
import org.springframework.messaging.converter.MessageConverter;
2526
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
@@ -59,7 +60,7 @@ public Object resolveArgument(MethodParameter parameter, Message<?> message) thr
5960
}
6061

6162
@Override
62-
protected boolean isEmptyPayload(Object payload) {
63+
protected boolean isEmptyPayload(@Nullable Object payload) {
6364
return payload == null || payload instanceof KafkaNull;
6465
}
6566

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.core.annotation.RepeatableContainers;
3434
import org.springframework.core.log.LogAccessor;
3535
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
36+
import org.springframework.lang.Nullable;
3637

3738

3839
/**
@@ -42,9 +43,11 @@
4243
* one from a {@link RetryableTopic} annotation, or from the bean container if no
4344
* annotation is available.
4445
*
46+
* <p>
4547
* If beans are found in the container there's a check to determine whether or not the
4648
* provided topics should be handled by any of such instances.
4749
*
50+
* <p>
4851
* If the annotation is provided, a
4952
* {@link org.springframework.kafka.annotation.DltHandler} annotated method is looked up.
5053
*
@@ -58,10 +61,13 @@
5861
*/
5962
public class RetryTopicConfigurationProvider {
6063

64+
@Nullable
6165
private final BeanFactory beanFactory;
6266

67+
@Nullable
6368
private final BeanExpressionResolver resolver;
6469

70+
@Nullable
6571
private final BeanExpressionContext expressionContext;
6672

6773
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(RetryTopicConfigurationProvider.class));
@@ -71,7 +77,7 @@ public class RetryTopicConfigurationProvider {
7177
* expression context.
7278
* @param beanFactory the bean factory.
7379
*/
74-
public RetryTopicConfigurationProvider(BeanFactory beanFactory) {
80+
public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory) {
7581
this(beanFactory, new StandardBeanExpressionResolver(), beanFactory instanceof ConfigurableBeanFactory
7682
? new BeanExpressionContext((ConfigurableBeanFactory) beanFactory, null)
7783
: null); // NOSONAR
@@ -83,13 +89,14 @@ public RetryTopicConfigurationProvider(BeanFactory beanFactory) {
8389
* @param resolver the bean expression resolver.
8490
* @param expressionContext the bean expression context.
8591
*/
86-
public RetryTopicConfigurationProvider(BeanFactory beanFactory, BeanExpressionResolver resolver,
87-
BeanExpressionContext expressionContext) {
92+
public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nullable BeanExpressionResolver resolver,
93+
@Nullable BeanExpressionContext expressionContext) {
8894

8995
this.beanFactory = beanFactory;
9096
this.resolver = resolver;
9197
this.expressionContext = expressionContext;
9298
}
99+
@Nullable
93100
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
94101
RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY,
95102
RepeatableContainers.none())
@@ -102,6 +109,7 @@ public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method
102109
: maybeGetFromContext(topics);
103110
}
104111

112+
@Nullable
105113
private RetryTopicConfiguration maybeGetFromContext(String[] topics) {
106114
if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) {
107115
LOGGER.warn("No ListableBeanFactory found, skipping RetryTopic configuration.");

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 30 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import java.lang.reflect.Method;
2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22+
import java.util.Collections;
2223
import java.util.List;
24+
import java.util.Objects;
2325

2426
import org.springframework.beans.factory.BeanFactory;
2527
import org.springframework.beans.factory.BeanInitializationException;
@@ -38,6 +40,7 @@
3840
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
3941
import org.springframework.kafka.retrytopic.RetryTopicConstants;
4042
import org.springframework.kafka.support.EndpointHandlerMethod;
43+
import org.springframework.lang.Nullable;
4144
import org.springframework.retry.annotation.Backoff;
4245
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
4346
import org.springframework.retry.backoff.ExponentialRandomBackOffPolicy;
@@ -70,10 +73,13 @@ public class RetryableTopicAnnotationProcessor {
7073

7174
private static final String CSQ_FOR_OSQ = "] for [";
7275

76+
@Nullable
7377
private final BeanFactory beanFactory;
7478

79+
@Nullable
7580
private final BeanExpressionResolver resolver;
7681

82+
@Nullable
7783
private final BeanExpressionContext expressionContext;
7884

7985
/**
@@ -93,8 +99,8 @@ public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) {
9399
* @param resolver the bean expression resolver.
94100
* @param expressionContext the bean expression context.
95101
*/
96-
public RetryableTopicAnnotationProcessor(BeanFactory beanFactory, BeanExpressionResolver resolver,
97-
BeanExpressionContext expressionContext) {
102+
public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nullable BeanExpressionResolver resolver,
103+
@Nullable BeanExpressionContext expressionContext) {
98104

99105
this.beanFactory = beanFactory;
100106
this.resolver = resolver;
@@ -116,12 +122,10 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
116122
boolean traverse = false;
117123
if (StringUtils.hasText(annotation.traversingCauses())) {
118124
Boolean traverseResolved = resolveExpressionAsBoolean(annotation.traversingCauses(), "traversingCauses");
119-
if (traverseResolved != null) {
120-
traverse = traverseResolved;
121-
}
122-
else {
123-
traverse = !includes.isEmpty() || !excludes.isEmpty();
124-
}
125+
traverse = Objects.requireNonNullElseGet(
126+
traverseResolved,
127+
() -> !includes.isEmpty() || !excludes.isEmpty()
128+
);
125129
}
126130
Boolean autoStartDlt = null;
127131
if (StringUtils.hasText(annotation.autoStartDltHandler())) {
@@ -157,9 +161,11 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
157161
return builder.create(getKafkaTemplate(resolveExpressionAsString(annotation.kafkaTemplate(), "kafkaTemplate"), topics));
158162
}
159163

160-
private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, BeanFactory beanFactory) { // NOSONAR
164+
private SleepingBackOffPolicy<?> createBackoffFromAnnotation(Backoff backoff, @Nullable BeanFactory beanFactory) { // NOSONAR
161165
StandardEvaluationContext evaluationContext = new StandardEvaluationContext();
162-
evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory));
166+
if (beanFactory != null) {
167+
evaluationContext.setBeanResolver(new BeanFactoryResolver(beanFactory));
168+
}
163169

164170
// Code from Spring Retry
165171
Long min = backoff.delay() == 0 ? backoff.value() : backoff.delay();
@@ -210,7 +216,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
210216
.orElse(RetryTopicConfigurer.DEFAULT_DLT_HANDLER);
211217
}
212218

213-
private KafkaOperations<?, ?> getKafkaTemplate(String kafkaTemplateName, String[] topics) {
219+
private KafkaOperations<?, ?> getKafkaTemplate(@Nullable String kafkaTemplateName, String[] topics) {
214220
if (StringUtils.hasText(kafkaTemplateName)) {
215221
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain kafka template by bean name");
216222
try {
@@ -236,6 +242,7 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
236242
}
237243
}
238244

245+
@Nullable
239246
private String resolveExpressionAsString(String value, String attribute) {
240247
Object resolved = resolveExpression(value);
241248
if (resolved instanceof String str) {
@@ -248,6 +255,7 @@ else if (resolved != null) {
248255
return null;
249256
}
250257

258+
@Nullable
251259
private Integer resolveExpressionAsInteger(String value, String attribute, boolean required) {
252260
Object resolved = resolveExpression(value);
253261
Integer result = null;
@@ -268,6 +276,8 @@ else if (resolved != null || required) {
268276
return result;
269277
}
270278

279+
@SuppressWarnings("SameParameterValue")
280+
@Nullable
271281
private Short resolveExpressionAsShort(String value, String attribute, boolean required) {
272282
Object resolved = resolveExpression(value);
273283
Short result = null;
@@ -288,6 +298,7 @@ else if (resolved != null || required) {
288298
return result;
289299
}
290300

301+
@Nullable
291302
private Long resolveExpressionAsLong(String value, String attribute, boolean required) {
292303
Object resolved = resolveExpression(value);
293304
Long result = null;
@@ -308,6 +319,8 @@ else if (resolved != null || required) {
308319
return result;
309320
}
310321

322+
@SuppressWarnings("SameParameterValue")
323+
@Nullable
311324
private Double resolveExpressionAsDouble(String value, String attribute, boolean required) {
312325
Object resolved = resolveExpression(value);
313326
Double result = null;
@@ -328,6 +341,7 @@ else if (resolved != null || required) {
328341
return result;
329342
}
330343

344+
@Nullable
331345
private Boolean resolveExpressionAsBoolean(String value, String attribute) {
332346
Object resolved = resolveExpression(value);
333347
Boolean result = null;
@@ -349,7 +363,8 @@ else if (resolved != null) {
349363
private List<Class<? extends Throwable>> resolveClasses(Class<? extends Throwable>[] fromAnnot, String[] names,
350364
String type) {
351365

352-
List<Class<? extends Throwable>> classes = new ArrayList<>(Arrays.asList(fromAnnot));
366+
List<Class<? extends Throwable>> classes = new ArrayList<>(fromAnnot.length + names.length);
367+
Collections.addAll(classes, fromAnnot);
353368
try {
354369
for (String name : names) {
355370
Class<?> clazz = ClassUtils.forName(name, ClassUtils.getDefaultClassLoader());
@@ -365,8 +380,9 @@ private List<Class<? extends Throwable>> resolveClasses(Class<? extends Throwabl
365380
return classes;
366381
}
367382

383+
@Nullable
368384
private Object resolveExpression(String value) {
369-
if (this.expressionContext != null) {
385+
if (this.expressionContext != null && this.resolver != null) {
370386
String resolved = resolve(value);
371387
return this.resolver.evaluate(resolved, this.expressionContext);
372388
}
@@ -375,6 +391,7 @@ private Object resolveExpression(String value) {
375391
}
376392
}
377393

394+
@Nullable
378395
private String resolve(String value) {
379396
if (this.beanFactory instanceof ConfigurableBeanFactory cbf) {
380397
return cbf.resolveEmbeddedValue(value);
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
/**
22
* Package for kafka annotations
33
*/
4+
@org.springframework.lang.NonNullApi
5+
@org.springframework.lang.NonNullFields
46
package org.springframework.kafka.annotation;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.listener.ListenerContainerRegistry;
3030
import org.springframework.kafka.listener.MessageListenerContainer;
3131
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
32+
import org.springframework.lang.Nullable;
3233

3334
/**
3435
* Provide the component instances that will be used with
@@ -154,7 +155,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
154155
* @param applicationContext the application context.
155156
* @return the instance.
156157
*/
157-
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry,
158+
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(@Nullable ListenerContainerRegistry registry,
158159
ApplicationContext applicationContext) {
159160

160161
return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext);

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -310,7 +310,7 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
310310
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
311311
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
312312
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
313-
ListenerContainerRegistry registry,
313+
@Nullable ListenerContainerRegistry registry,
314314
ObjectProvider<RetryTopicComponentFactory> componentFactoryProvider,
315315
@Nullable RetryTopicSchedulerWrapper wrapper,
316316
@Nullable TaskScheduler taskScheduler) {
@@ -325,7 +325,7 @@ public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContex
325325
}
326326

327327
private void configurePartitionPausingFactory(ContainerPartitionPausingBackOffManagerFactory factory,
328-
ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) {
328+
@Nullable ListenerContainerRegistry registry, @Nullable TaskScheduler scheduler) {
329329

330330
Assert.notNull(scheduler, "Either a RetryTopicSchedulerWrapper or TaskScheduler bean is required");
331331
factory.setBackOffHandler(new ContainerPausingBackOffHandler(
@@ -346,8 +346,10 @@ protected RetryTopicComponentFactory createComponentFactory() {
346346
*/
347347
public static class BlockingRetriesConfigurer {
348348

349+
@Nullable
349350
private BackOff backOff;
350351

352+
@Nullable
351353
private Class<? extends Exception>[] retryableExceptions;
352354

353355
/**
@@ -378,10 +380,12 @@ public BlockingRetriesConfigurer backOff(BackOff backoff) {
378380
return this;
379381
}
380382

383+
@Nullable
381384
BackOff getBackOff() {
382385
return this.backOff;
383386
}
384387

388+
@Nullable
385389
Class<? extends Exception>[] getRetryableExceptions() {
386390
return this.retryableExceptions;
387391
}
@@ -393,10 +397,13 @@ Class<? extends Exception>[] getRetryableExceptions() {
393397
*/
394398
public static class CustomizersConfigurer {
395399

400+
@Nullable
396401
private Consumer<DefaultErrorHandler> errorHandlerCustomizer;
397402

403+
@Nullable
398404
private Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer;
399405

406+
@Nullable
400407
private Consumer<DeadLetterPublishingRecoverer> deadLetterPublishingRecovererCustomizer;
401408

402409
/**
@@ -406,6 +413,7 @@ public static class CustomizersConfigurer {
406413
* @return the configurer.
407414
* @see DefaultErrorHandler
408415
*/
416+
@SuppressWarnings("unused")
409417
public CustomizersConfigurer customizeErrorHandler(Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
410418
this.errorHandlerCustomizer = errorHandlerCustomizer;
411419
return this;
@@ -433,14 +441,17 @@ public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer<Dea
433441
return this;
434442
}
435443

444+
@Nullable
436445
Consumer<DefaultErrorHandler> getErrorHandlerCustomizer() {
437446
return this.errorHandlerCustomizer;
438447
}
439448

449+
@Nullable
440450
Consumer<ConcurrentMessageListenerContainer<?, ?>> getListenerContainerCustomizer() {
441451
return this.listenerContainerCustomizer;
442452
}
443453

454+
@Nullable
444455
Consumer<DeadLetterPublishingRecoverer> getDeadLetterPublishingRecovererCustomizer() {
445456
return this.deadLetterPublishingRecovererCustomizer;
446457
}

0 commit comments

Comments
 (0)