Skip to content

Commit b2b2262

Browse files
committed
Resolve new Sonar Issues
1 parent 777a930 commit b2b2262

17 files changed

+264
-212
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,7 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
399399
.findRetryConfigurationFor(kafkaListener.topics(), methodToUse, bean);
400400

401401
if (retryTopicConfiguration == null) {
402-
this.logger.debug("No retry topic configuration found for topics " + kafkaListener.topics() + ".");
402+
this.logger.debug("No retry topic configuration found for topics " + Arrays.asList(kafkaListener.topics()));
403403
return false;
404404
}
405405

@@ -413,15 +413,14 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
413413
}
414414

415415
private RetryTopicConfigurer getRetryTopicConfigurer() {
416-
RetryTopicConfigurer retryTopicConfigurer;
417416
try {
418417
return this.beanFactory.getBean(RetryTopicConfigurer.class);
419418
}
420419
catch (NoSuchBeanDefinitionException e) {
421420
if (!(this.beanFactory instanceof AutowireCapableBeanFactory)) {
422421
throw new IllegalStateException("BeanFactory must be an instance of "
423422
+ AutowireCapableBeanFactory.class.getSimpleName()
424-
+ " Provided beanFactory: " + this.beanFactory.getClass().getSimpleName());
423+
+ " Provided beanFactory: " + this.beanFactory.getClass().getSimpleName(), e);
425424
}
426425
((AutowireCapableBeanFactory) this.beanFactory)
427426
.createBean(RetryTopicBootstrapper.class)
@@ -471,9 +470,9 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
471470
}
472471

473472
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
474-
Object bean, Object adminTarget, String beanName,
475-
Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> endpointCustomizer,
476-
Function<KafkaListenerContainerFactory<?>, KafkaListenerContainerFactory<?>> containerFactoryCustomizer) {
473+
Object bean, Object adminTarget, String beanName,
474+
Function<MethodKafkaListenerEndpoint<?, ?>, MethodKafkaListenerEndpoint<?, ?>> endpointCustomizer,
475+
Function<KafkaListenerContainerFactory<?>, KafkaListenerContainerFactory<?>> containerFactoryCustomizer) {
477476

478477
String beanRef = kafkaListener.beanRef();
479478
if (StringUtils.hasText(beanRef)) {
@@ -505,18 +504,18 @@ protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, Kafka
505504
resolveKafkaProperties(endpoint, kafkaListener.properties());
506505
endpoint.setSplitIterables(kafkaListener.splitIterables());
507506

508-
endpoint = endpointCustomizer.apply(endpoint);
507+
MethodKafkaListenerEndpoint<?, ?> customizedEndpoint = endpointCustomizer.apply(endpoint);
509508

510509
String containerFactory = resolve(kafkaListener.containerFactory());
511510
KafkaListenerContainerFactory<?> containerFactoryInstanceOrNull = containerFactoryCustomizer
512511
.apply(resolveContainerFactory(kafkaListener, containerFactory, beanName));
513512

514-
this.registrar.registerEndpoint(endpoint, containerFactoryInstanceOrNull);
513+
this.registrar.registerEndpoint(customizedEndpoint, containerFactoryInstanceOrNull);
515514

516-
endpoint.setBeanFactory(this.beanFactory);
515+
customizedEndpoint.setBeanFactory(this.beanFactory);
517516
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
518517
if (StringUtils.hasText(errorHandlerBeanName)) {
519-
resolveErrorHandler(endpoint, kafkaListener);
518+
resolveErrorHandler(customizedEndpoint, kafkaListener);
520519
}
521520
if (StringUtils.hasText(beanRef)) {
522521
this.listenerScope.removeListener(beanRef);

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,21 +24,22 @@
2424

2525
import org.springframework.kafka.retrytopic.DltStrategy;
2626
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
27-
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
2827
import org.springframework.kafka.retrytopic.RetryTopicConstants;
2928
import org.springframework.kafka.retrytopic.TopicSuffixingStrategy;
3029
import org.springframework.retry.annotation.Backoff;
3130
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;
3231

3332
/**
3433
*
35-
* Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated listener.
36-
* See {@link RetryTopicConfigurer for usage examples.}
34+
* Annotation to create the retry and dlt topics for a {@link KafkaListener} annotated
35+
* listener. See {@link org.springframework.kafka.retrytopic.RetryTopicConfigurer} for
36+
* usage examples.
3737
*
3838
* @author Tomaz Fernandes
39+
* @author Gary Russell
3940
* @since 2.7
4041
*
41-
* @see RetryTopicConfigurer
42+
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
4243
*/
4344
@Target({ ElementType.METHOD })
4445
@Retention(RetentionPolicy.RUNTIME)
@@ -62,8 +63,8 @@
6263

6364
/**
6465
*
65-
* The amount of time in milliseconds after which message retrying should give up
66-
* and send the message to the DLT.
66+
* The amount of time in milliseconds after which message retrying should give up and
67+
* send the message to the DLT.
6768
*
6869
* @return the timeout value.
6970
*
@@ -72,27 +73,30 @@
7273

7374
/**
7475
*
75-
* The bean name of the {@link org.springframework.kafka.core.KafkaTemplate} bean
76-
* that will be used to forward the message to the retry and Dlt topics. If not specified,
76+
* The bean name of the {@link org.springframework.kafka.core.KafkaTemplate} bean that
77+
* will be used to forward the message to the retry and Dlt topics. If not specified,
7778
* a bean with name retryTopicDefaultKafkaTemplate will be looked up.
7879
*
7980
* @return the kafkaTemplate bean name.
8081
*/
8182
String kafkaTemplate() default "";
8283

8384
/**
84-
* The bean name of the {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory} that will be used to create
85-
* the consumers for the retry and dlt topics. If none is provided, the one from the {@link KafkaListener} annotation
86-
* is used, or else a default one, if any.
85+
* The bean name of the
86+
* {@link org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory}
87+
* that will be used to create the consumers for the retry and dlt topics. If none is
88+
* provided, the one from the {@link KafkaListener} annotation is used, or else a
89+
* default one, if any.
8790
*
8891
* @return the listenerContainerFactory bean name.
8992
*/
9093
String listenerContainerFactory() default "";
9194

9295
/**
93-
* Whether or not the topic should be created after registration with the provided configurations.
94-
* Not to be confused with the ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG from Kafka
95-
* configuration, which is handled by the {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
96+
* Whether or not the topic should be created after registration with the provided
97+
* configurations. Not to be confused with the
98+
* ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG from Kafka configuration, which is
99+
* handled by the {@link org.apache.kafka.clients.consumer.KafkaConsumer}.
96100
*
97101
* @return the configuration.
98102
*/
@@ -120,50 +124,46 @@
120124
Class<? extends Throwable>[] include() default {};
121125

122126
/**
123-
* The exceptions that should not be retried.
124-
* When the message processing throws these exceptions
125-
* the message goes straight to the DLT.
127+
* The exceptions that should not be retried. When the message processing throws these
128+
* exceptions the message goes straight to the DLT.
126129
*
127130
* @return the exceptions not to be retried.
128131
*/
129132
Class<? extends Throwable>[] exclude() default {};
130133

131134
/**
132-
* Whether or not the captured exception should be
133-
* traversed to look for the exceptions provided above.
135+
* Whether or not the captured exception should be traversed to look for the
136+
* exceptions provided above.
134137
*
135138
* @return the value.
136139
*/
137140
boolean traversingCauses() default false;
138141

139142
/**
140-
* The suffix that will be appended to the main topic in order to generate
141-
* the retry topics. The corresponding delay value is also appended.
143+
* The suffix that will be appended to the main topic in order to generate the retry
144+
* topics. The corresponding delay value is also appended.
142145
*
143146
* @return the retry topics' suffix.
144147
*/
145148
String retryTopicSuffix() default RetryTopicConstants.DEFAULT_RETRY_SUFFIX;
146149

147150
/**
148-
* The suffix that will be appended to the main topic in order to generate
149-
* the dlt topic.
151+
* The suffix that will be appended to the main topic in order to generate the dlt
152+
* topic.
150153
*
151154
* @return the dlt suffix.
152155
*/
153156
String dltTopicSuffix() default RetryTopicConstants.DEFAULT_DLT_SUFFIX;
154157

155-
156158
/**
157-
* The suffix that will be appended to the main topic in order to generate
158-
* the dlt topic.
159+
* The suffix that will be appended to the main topic in order to generate the dlt
160+
* topic.
159161
*
160162
* @return the dlt suffix.
161163
*/
162164
TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;
163165

164-
DltStrategy dltStrategy()
165-
default DltStrategy.ALWAYS_RETRY_ON_ERROR;
166+
DltStrategy dltStrategy() default DltStrategy.ALWAYS_RETRY_ON_ERROR;
166167

167-
FixedDelayStrategy fixedDelayTopicStrategy()
168-
default FixedDelayStrategy.MULTIPLE_TOPICS;
168+
FixedDelayStrategy fixedDelayTopicStrategy() default FixedDelayStrategy.MULTIPLE_TOPICS;
169169
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,29 +22,25 @@
2222
import java.util.HashMap;
2323
import java.util.Map;
2424

25-
import org.apache.commons.logging.LogFactory;
2625
import org.apache.kafka.common.TopicPartition;
2726

2827
import org.springframework.beans.factory.annotation.Qualifier;
2928
import org.springframework.context.ApplicationListener;
30-
import org.springframework.core.log.LogAccessor;
3129
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
3230
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
3331

3432
/**
3533
*
36-
* A manager that backs off consumption for a given topic if the timestamp provided is not due.
37-
* Use with {@link SeekToCurrentErrorHandler} to guarantee that the message is read again after
38-
* partition consumption is resumed (or seek it manually by other means).
34+
* A manager that backs off consumption for a given topic if the timestamp provided is not
35+
* due. Use with {@link SeekToCurrentErrorHandler} to guarantee that the message is read
36+
* again after partition consumption is resumed (or seek it manually by other means).
3937
*
4038
* @author Tomaz Fernandes
4139
* @since 2.7
4240
* @see SeekToCurrentErrorHandler
4341
*/
4442
public class KafkaConsumerBackoffManager implements ApplicationListener<ListenerContainerPartitionIdleEvent> {
4543

46-
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
47-
4844
/**
4945
* Internal Back Off Clock Bean Name.
5046
*/
@@ -121,17 +117,17 @@ public static class Context {
121117
/**
122118
* The time after which the message should be processed.
123119
*/
124-
final long dueTimestamp;
120+
final long dueTimestamp; // NOSONAR
125121

126122
/**
127123
* The id for the listener that should be paused.
128124
*/
129-
final String listenerId;
125+
final String listenerId; // NOSONAR
130126

131127
/**
132128
* The topic that contains the partition to be paused.
133129
*/
134-
final TopicPartition topicPartition;
130+
final TopicPartition topicPartition; // NOSONAR
135131

136132
Context(long dueTimestamp, String listenerId, TopicPartition topicPartition) {
137133
this.dueTimestamp = dueTimestamp;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
688688

689689
private volatile long lastPoll = System.currentTimeMillis();
690690

691-
private Set<TopicPartition> pausedPartitions;
691+
private final Set<TopicPartition> pausedPartitions;
692692

693693
@SuppressWarnings(UNCHECKED)
694694
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
@@ -1262,12 +1262,12 @@ private void checkIdlePartitions() {
12621262
private void checkIdlePartition(TopicPartition topicPartition) {
12631263
if (this.containerProperties.getIdlePartitionEventInterval() != null) {
12641264
long now = System.currentTimeMillis();
1265-
Long lastReceive = this.lastReceivePartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
1266-
Long lastAlertAt = this.lastAlertPartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
1267-
if (now > lastReceive + this.containerProperties.getIdlePartitionEventInterval()
1268-
&& now > lastAlertAt + this.containerProperties.getIdlePartitionEventInterval()) {
1265+
Long lstReceive = this.lastReceivePartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
1266+
Long lstAlertAt = this.lastAlertPartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
1267+
if (now > lstReceive + this.containerProperties.getIdlePartitionEventInterval()
1268+
&& now > lstAlertAt + this.containerProperties.getIdlePartitionEventInterval()) {
12691269
this.wasIdlePartition.put(topicPartition, true);
1270-
publishIdlePartitionEvent(now - lastReceive, topicPartition, this.consumer,
1270+
publishIdlePartitionEvent(now - lstReceive, topicPartition, this.consumer,
12711271
isPartitionPauseRequested(topicPartition));
12721272
this.lastAlertPartition.put(topicPartition, now);
12731273
if (this.consumerSeekAwareListener != null) {
@@ -1285,11 +1285,11 @@ private void notIdlePartitions(Set<TopicPartition> partitions) {
12851285

12861286
private void notIdlePartition(TopicPartition topicPartition) {
12871287
long now = System.currentTimeMillis();
1288-
Boolean wasIdle = this.wasIdlePartition.get(topicPartition);
1289-
if (wasIdle != null && wasIdle) {
1288+
Boolean partitionWasIdle = this.wasIdlePartition.get(topicPartition);
1289+
if (partitionWasIdle != null && partitionWasIdle) {
12901290
this.wasIdlePartition.put(topicPartition, false);
1291-
Long lastReceive = this.lastReceivePartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
1292-
publishNoLongerIdlePartitionEvent(now - this.lastReceive, this.consumer, topicPartition);
1291+
Long lstReceive = this.lastReceivePartition.computeIfAbsent(topicPartition, newTopicPartition -> now);
1292+
publishNoLongerIdlePartitionEvent(now - lstReceive, this.consumer, topicPartition);
12931293
}
12941294
this.lastReceivePartition.put(topicPartition, now);
12951295
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/BackOffValuesGenerator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ public class BackOffValuesGenerator {
4949

5050
public BackOffValuesGenerator(int providedMaxAttempts, BackOffPolicy providedBackOffPolicy) {
5151
this.numberOfvaluesToCreate = getMaxAttemps(providedMaxAttempts) - 1;
52-
BackOffPolicy backOffPolicy = providedBackOffPolicy != null ? providedBackOffPolicy : DEFAULT_BACKOFF_POLICY;
53-
checkBackOffPolicyTipe(backOffPolicy);
54-
this.backOffPolicy = backOffPolicy;
52+
BackOffPolicy policy = providedBackOffPolicy != null ? providedBackOffPolicy : DEFAULT_BACKOFF_POLICY;
53+
checkBackOffPolicyTipe(policy);
54+
this.backOffPolicy = policy;
5555
}
5656

5757
public int getMaxAttemps(int providedMaxAttempts) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,12 @@ public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destination
5858
@SuppressWarnings("unchecked")
5959
public DeadLetterPublishingRecoverer create(Configuration configuration) {
6060
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(configuration.template,
61-
((cr, e) -> this.resolveDestination(cr, e, configuration))) {
61+
((cr, e) -> this.resolveDestination(cr, e))) {
62+
6263
@Override
63-
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
64+
protected void publish(ProducerRecord<Object, Object> outRecord,
65+
KafkaOperations<Object, Object> kafkaTemplate) {
66+
6467
if (NO_OPS_RETRY_TOPIC.equals(outRecord.topic())) {
6568
this.logger.warn(() -> "Processing failed for last topic, giving up.");
6669
return;
@@ -72,6 +75,7 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
7275
.getKafkaOperations();
7376
super.publish(outRecord, kafkaOperationsForTopic);
7477
}
78+
7579
};
7680

7781
recoverer.setHeadersFunction((consumerRecord, e) -> addHeaders(consumerRecord, e, getAttempts(consumerRecord)));
@@ -83,7 +87,7 @@ public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublis
8387
this.recovererCustomizer = customizer;
8488
}
8589

86-
private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e, Configuration configuration) {
90+
private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
8791
if (isBackoffException(e)) {
8892
throw (NestedRuntimeException) e; // Necessary to not commit the offset and seek to current again
8993
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,12 @@ public class ListenerContainerFactoryConfigurer {
5757
configuredFactoriesCache = new HashSet<>();
5858
}
5959

60-
private final static String INTERNAL_KAFKA_CONSUMER_BACKOFF_BEAN_NAME = "kafkaconsumerbackoff-internal";
61-
private final static long DEFAULT_IDLE_PARTITION_EVENT_INTERVAL = 1000L;
60+
private static final long DEFAULT_IDLE_PARTITION_EVENT_INTERVAL = 1000L;
61+
6262
private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory;
63+
6364
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> { };
65+
6466
private Consumer<ErrorHandler> errorHandlerCustomizer = errorHandler -> { };
6567

6668
ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
@@ -94,6 +96,7 @@ private void addToFactoriesCache(ConcurrentKafkaListenerContainerFactory<?, ?> c
9496
}
9597

9698
public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
99+
Assert.notNull(containerCustomizer, "'containerCustomizer' cannot be null");
97100
this.containerCustomizer = containerCustomizer;
98101
}
99102

0 commit comments

Comments
 (0)