Skip to content

Commit d005a2a

Browse files
tomazfernandesgaryrussell
authored andcommitted
GH-1715,GH-1717: GH-920 Polishing
Fixes GH-1715. Fixes GH-1717. Remove leading space in javadoRemove leading space in javadocc
1 parent 120ca48 commit d005a2a

10 files changed

+235
-103
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,8 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
411411
resolveContainerFactory(kafkaListener, resolve(kafkaListener.containerFactory()), beanName);
412412

413413
getRetryTopicConfigurer()
414-
.processMainAndRetryListeners(endpointProcessor, endpoint, retryTopicConfiguration, this.registrar, factory);
414+
.processMainAndRetryListeners(endpointProcessor, endpoint, retryTopicConfiguration,
415+
this.registrar, factory, this.defaultContainerFactoryBeanName);
415416

416417
this.listenerScope.removeListener(beanRef);
417418
return true;

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public class RetryableTopicAnnotationProcessor {
6060
PARSER = new SpelExpressionParser();
6161
}
6262

63+
private final static String DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME = "kafkaTemplate";
64+
6365
public RetryableTopicAnnotationProcessor(BeanFactory beanFactory) {
6466
this.beanFactory = beanFactory;
6567
}
@@ -163,7 +165,12 @@ private RetryTopicConfigurer.EndpointHandlerMethod getDltProcessor(Method listen
163165
return this.beanFactory.getBean(RetryTopicInternalBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class);
164166
}
165167
catch (NoSuchBeanDefinitionException ex) {
166-
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", ex);
168+
try {
169+
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
170+
}
171+
catch (NoSuchBeanDefinitionException exc) {
172+
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", exc);
173+
}
167174
}
168175
}
169176
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ public int hashCode() {
190190
return Objects.hash(this.delayMs, this.suffix, this.type, this.maxAttempts, this.numPartitions,
191191
this.dltStrategy, this.kafkaOperations);
192192
}
193+
194+
public boolean isMainEndpoint() {
195+
return Type.MAIN.equals(this.type);
196+
}
193197
}
194198

195199
enum Type {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopicPropertiesFactory.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,12 @@ public List<DestinationTopic.Properties> createProperties() {
8888
}
8989

9090
private List<DestinationTopic.Properties> createPropertiesForFixedDelaySingleTopic() {
91-
return Arrays.asList(createMainTopicProperties(), createRetryProperties(1,
92-
DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()), createDltProperties());
91+
return isNoDltStrategy()
92+
? Arrays.asList(createMainTopicProperties(),
93+
createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()))
94+
: Arrays.asList(createMainTopicProperties(),
95+
createRetryProperties(1, DestinationTopic.Type.SINGLE_TOPIC_RETRY, getShouldRetryOn()),
96+
createDltProperties());
9397
}
9498

9599
private boolean isSingleTopicFixedDelay() {
@@ -101,26 +105,29 @@ private boolean isSingleTopicStrategy() {
101105
}
102106

103107
private List<DestinationTopic.Properties> createPropertiesForDefaultTopicStrategy() {
104-
return IntStream.rangeClosed(0, DltStrategy.NO_DLT.equals(this.dltStrategy)
108+
return IntStream.rangeClosed(0, isNoDltStrategy()
105109
? this.maxAttempts - 1
106110
: this.maxAttempts)
107-
.mapToObj(index -> createRetryOrDltTopicSuffixes(index))
111+
.mapToObj(this::createRetryOrDltTopicSuffixes)
108112
.collect(Collectors.toList());
109113
}
110114

111-
private DestinationTopic.Properties createMainTopicProperties() {
112-
return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts,
113-
this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout);
115+
private boolean isNoDltStrategy() {
116+
return DltStrategy.NO_DLT.equals(this.dltStrategy);
114117
}
115118

116119
private DestinationTopic.Properties createRetryOrDltTopicSuffixes(int index) {
117120
BiPredicate<Integer, Throwable> shouldRetryOn = getShouldRetryOn();
118121
return index == 0
119122
? createMainTopicProperties()
120123
: index < this.maxAttempts
121-
? createRetryProperties(index,
122-
DestinationTopic.Type.RETRY, shouldRetryOn)
123-
: createDltProperties();
124+
? createRetryProperties(index, DestinationTopic.Type.RETRY, shouldRetryOn)
125+
: createDltProperties();
126+
}
127+
128+
private DestinationTopic.Properties createMainTopicProperties() {
129+
return new DestinationTopic.Properties(0, MAIN_TOPIC_SUFFIX, DestinationTopic.Type.MAIN, this.maxAttempts,
130+
this.numPartitions, this.dltStrategy, this.kafkaOperations, getShouldRetryOn(), this.timeout);
124131
}
125132

126133
private DestinationTopic.Properties createDltProperties() {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryResolver.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,17 +59,15 @@ public class ListenerContainerFactoryResolver {
5959
this.retryEndpointCache = new Cache();
6060

6161
this.mainEndpointResolvers = Arrays.asList(
62-
(fromKafkaListenerAnnotation, configuration) -> this.mainEndpointCache
63-
.fromCache(fromKafkaListenerAnnotation, configuration),
62+
this.mainEndpointCache::fromCache,
6463
(fromKafkaListenerAnnotation, configuration) -> fromKafkaListenerAnnotation,
6564
(fromKLAnnotation, configuration) -> configuration.factoryFromRetryTopicConfiguration,
6665
(fromKLAnnotation, configuration) -> fromBeanName(configuration.listenerContainerFactoryName),
6766
(fromKLAnnotation, configuration) ->
6867
fromBeanName(RetryTopicInternalBeanNames.DEFAULT_LISTENER_FACTORY_BEAN_NAME));
6968

7069
this.retryEndpointResolvers = Arrays.asList(
71-
(fromKafkaListenerAnnotation, configuration) ->
72-
this.retryEndpointCache.fromCache(fromKafkaListenerAnnotation, configuration),
70+
this.retryEndpointCache::fromCache,
7371
(fromKLAnnotation, configuration) -> configuration.factoryFromRetryTopicConfiguration,
7472
(fromKLAnnotation, configuration) -> fromBeanName(configuration.listenerContainerFactoryName),
7573
(fromKLAnnotation, configuration) -> fromKLAnnotation,
@@ -78,19 +76,38 @@ public class ListenerContainerFactoryResolver {
7876
}
7977

8078
ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactoryForMainEndpoint(
81-
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation, Configuration config) {
79+
@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
80+
String defaultContainerFactoryBeanName,
81+
Configuration config) {
82+
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
83+
getFactoryFromKLA(factoryFromKafkaListenerAnnotationInstance, defaultContainerFactoryBeanName);
8284
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = resolveFactory(this.mainEndpointResolvers,
8385
factoryFromKafkaListenerAnnotation, config);
8486
return this.mainEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory);
8587
}
8688

8789
ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactoryForRetryEndpoint(
88-
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation, Configuration config) {
90+
@Nullable KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
91+
String defaultContainerFactoryBeanName,
92+
Configuration config) {
93+
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
94+
getFactoryFromKLA(factoryFromKafkaListenerAnnotationInstance, defaultContainerFactoryBeanName);
8995
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = resolveFactory(this.retryEndpointResolvers,
9096
factoryFromKafkaListenerAnnotation, config);
9197
return this.retryEndpointCache.addIfAbsent(factoryFromKafkaListenerAnnotation, config, resolvedFactory);
9298
}
9399

100+
@Nullable
101+
private KafkaListenerContainerFactory<?> getFactoryFromKLA(KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotationInstance,
102+
String defaultContainerFactoryBeanName) {
103+
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation =
104+
factoryFromKafkaListenerAnnotationInstance;
105+
if (factoryFromKafkaListenerAnnotation == null) {
106+
factoryFromKafkaListenerAnnotation = fromBeanName(defaultContainerFactoryBeanName);
107+
}
108+
return factoryFromKafkaListenerAnnotation;
109+
}
110+
94111
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveFactory(List<FactoryResolver> factoryResolvers,
95112
KafkaListenerContainerFactory<?> factoryFromKafkaListenerAnnotation,
96113
Configuration config) {

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 45 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -239,72 +239,71 @@ public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
239239
/**
240240
* Entrypoint for creating and configuring the retry and dlt endpoints, as well as the
241241
* container factory that will create the corresponding listenerContainer.
242-
*
243242
* @param endpointProcessor function that will process the endpoints
244243
* processListener method.
245244
* @param mainEndpoint the endpoint based on which retry and dlt endpoints are also
246245
* created and processed.
247246
* @param configuration the configuration for the topic.
248-
* @param registrar The {@link KafkaListenerEndpointRegistrar} that will register the
249-
* endpoints.
247+
* @param registrar The {@link KafkaListenerEndpointRegistrar} that will register the endpoints.
250248
* @param factory The factory provided in the {@link org.springframework.kafka.annotation.KafkaListener}
251-
* annotation, if any.
249+
* @param defaultContainerFactoryBeanName The default factory bean name for the
250+
* {@link org.springframework.kafka.annotation.KafkaListener}
252251
*
253252
*/
254253
public void processMainAndRetryListeners(EndpointProcessor endpointProcessor,
255254
MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
256255
RetryTopicConfiguration configuration,
257256
KafkaListenerEndpointRegistrar registrar,
258-
@Nullable KafkaListenerContainerFactory<?> factory) {
257+
@Nullable KafkaListenerContainerFactory<?> factory,
258+
String defaultContainerFactoryBeanName) {
259259
throwIfMultiMethodEndpoint(mainEndpoint);
260-
configureMainEndpoint(mainEndpoint, endpointProcessor, registrar, factory, configuration);
261260
DestinationTopicProcessor.Context context =
262261
new DestinationTopicProcessor.Context(configuration.getDestinationTopicProperties());
263-
configureRetryAndDltEndpoints(mainEndpoint, endpointProcessor, factory, registrar, configuration, context);
262+
configureEndpoints(mainEndpoint, endpointProcessor, factory, registrar, configuration, context,
263+
defaultContainerFactoryBeanName);
264264
this.destinationTopicProcessor.processRegisteredDestinations(getTopicCreationFunction(configuration), context);
265265
}
266266

267-
private void configureMainEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
267+
private void configureEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
268268
EndpointProcessor endpointProcessor,
269-
KafkaListenerEndpointRegistrar registrar,
270269
KafkaListenerContainerFactory<?> factory,
271-
RetryTopicConfiguration configuration) {
272-
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
273-
resolveAndConfigureFactoryForMainEndpoint(factory, configuration);
274-
endpointProcessor.process(mainEndpoint);
275-
registrar.registerEndpoint(mainEndpoint, resolvedFactory);
276-
mainEndpoint.setBeanFactory(this.beanFactory);
277-
}
278-
279-
private void configureRetryAndDltEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
280-
EndpointProcessor endpointProcessor,
281-
KafkaListenerContainerFactory<?> factory,
282-
KafkaListenerEndpointRegistrar registrar,
283-
RetryTopicConfiguration configuration,
284-
DestinationTopicProcessor.Context context) {
285-
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
286-
resolveAndConfigureFactoryForRetryEndpoint(factory, configuration);
287-
270+
KafkaListenerEndpointRegistrar registrar,
271+
RetryTopicConfiguration configuration,
272+
DestinationTopicProcessor.Context context,
273+
String defaultContainerFactoryBeanName) {
288274
this.destinationTopicProcessor
289275
.processDestinationTopicProperties(destinationTopicProperties ->
290-
processAndRegisterRetryDltDestination(mainEndpoint,
276+
processAndRegisterEndpoints(mainEndpoint,
291277
endpointProcessor,
292-
resolvedFactory,
278+
factory,
279+
defaultContainerFactoryBeanName,
293280
registrar,
294281
configuration,
295282
context,
296283
destinationTopicProperties),
297284
context);
298285
}
299286

300-
private void processAndRegisterRetryDltDestination(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor,
301-
KafkaListenerContainerFactory<?> resolvedFactory, KafkaListenerEndpointRegistrar registrar,
302-
RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context,
303-
DestinationTopic.Properties destinationTopicProperties) {
304-
MethodKafkaListenerEndpoint<Object, Object> endpoint = new MethodKafkaListenerEndpoint<>();
287+
private void processAndRegisterEndpoints(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, EndpointProcessor endpointProcessor,
288+
KafkaListenerContainerFactory<?> factory,
289+
String defaultFactoryBeanName,
290+
KafkaListenerEndpointRegistrar registrar,
291+
RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context,
292+
DestinationTopic.Properties destinationTopicProperties) {
293+
294+
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
295+
destinationTopicProperties.isMainEndpoint()
296+
? resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration)
297+
: resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
298+
299+
MethodKafkaListenerEndpoint<?, ?> endpoint = destinationTopicProperties.isMainEndpoint()
300+
? mainEndpoint
301+
: new MethodKafkaListenerEndpoint<>();
302+
305303
endpointProcessor.accept(endpoint);
306304

307-
EndpointHandlerMethod endpointBeanMethod = getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
305+
EndpointHandlerMethod endpointBeanMethod =
306+
getEndpointHandlerMethod(mainEndpoint, configuration, destinationTopicProperties);
308307

309308
createEndpointCustomizer(endpointBeanMethod, destinationTopicProperties)
310309
.customizeEndpointAndCollectTopics(endpoint)
@@ -319,7 +318,8 @@ private void processAndRegisterRetryDltDestination(MethodKafkaListenerEndpoint<?
319318
}
320319

321320
private EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint,
322-
RetryTopicConfiguration configuration, DestinationTopic.Properties props) {
321+
RetryTopicConfiguration configuration,
322+
DestinationTopic.Properties props) {
323323
EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod();
324324
EndpointHandlerMethod retryBeanMethod = new EndpointHandlerMethod(mainEndpoint.getBean(), mainEndpoint.getMethod());
325325
return props.isDltTopic() ? getDltEndpointHandlerMethodOrDefault(dltHandlerMethod) : retryBeanMethod;
@@ -355,17 +355,20 @@ private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandl
355355

356356
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForMainEndpoint(
357357
KafkaListenerContainerFactory<?> providedFactory,
358-
RetryTopicConfiguration configuration) {
358+
String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
359359

360360
return this.listenerContainerFactoryConfigurer
361361
.configure(this.containerFactoryResolver.resolveFactoryForMainEndpoint(providedFactory,
362+
defaultFactoryBeanName,
362363
configuration.forContainerFactoryResolver()));
363364
}
364365

365366
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(KafkaListenerContainerFactory<?> providedFactory,
367+
String defaultFactoryBeanName,
366368
RetryTopicConfiguration configuration) {
367369
return this.listenerContainerFactoryConfigurer
368370
.configure(this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory,
371+
defaultFactoryBeanName,
369372
configuration.forContainerFactoryResolver()));
370373
}
371374

@@ -435,7 +438,8 @@ private EndpointCustomizer addSuffixesAndMethod(String topicSuffix, Object bean,
435438
};
436439
}
437440

438-
private Collection<TopicNamesHolder> customizeAndRegisterTopics(Suffixer suffixer, MethodKafkaListenerEndpoint<?, ?> endpoint) {
441+
private Collection<TopicNamesHolder> customizeAndRegisterTopics(Suffixer suffixer,
442+
MethodKafkaListenerEndpoint<?, ?> endpoint) {
439443
return getTopics(endpoint)
440444
.stream()
441445
.map(topic -> new TopicNamesHolder(topic, suffixer.maybeAddTo(topic)))
@@ -494,7 +498,8 @@ public EndpointHandlerMethod(Class<?> beanClass, String methodName) {
494498
this.method = Arrays.stream(ReflectionUtils.getDeclaredMethods(beanClass))
495499
.filter(mthd -> mthd.getName().equals(methodName))
496500
.findFirst()
497-
.orElseThrow(() -> new IllegalArgumentException(String.format("No method %s in class %s", methodName, beanClass)));
501+
.orElseThrow(() -> new IllegalArgumentException(
502+
String.format("No method %s in class %s", methodName, beanClass)));
498503
this.beanClass = beanClass;
499504
}
500505

@@ -528,13 +533,12 @@ static class LoggingDltListenerHandlerMethod {
528533

529534
public void logMessage(Object message) {
530535
if (message instanceof ConsumerRecord) {
531-
LOGGER.info(() -> "Received message in dlt listener: " + ListenerUtils.recordToString((ConsumerRecord<?, ?>) message));
536+
LOGGER.info(() -> "Received message in dlt listener: "
537+
+ ListenerUtils.recordToString((ConsumerRecord<?, ?>) message));
532538
}
533539
else {
534540
LOGGER.info(() -> "Received message in dlt listener.");
535541
}
536542
}
537543
}
538544
}
539-
540-

0 commit comments

Comments
 (0)