Skip to content

Commit 469edfa

Browse files
GH-2069: Same factory for retryable and normal endpoints (#2108)
* GH-2069: Same factory for retryable and normal endpoints Resolves #2069 Originally the retryable topic's ListenerContainerFactory configuration would interfere with non-retryable topics. Now we do not set the feature's factory configurations if the container is not retryable. * Add and update javadoc as requested in code review
1 parent 19c6b23 commit 469edfa

File tree

7 files changed

+643
-19
lines changed

7 files changed

+643
-19
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,8 +668,6 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
668668

669669
By default the RetryTopic configuration will use the provided factory from the `@KafkaListener` annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.
670670

671-
IMPORTANT: The provided factory will be configured for the retry topic functionality, so you should not use the same factory for both retrying and non-retrying topics. You can however share the same factory between many retry topic configurations.
672-
673671
For the `@RetryableTopic` annotation you can provide the factory's bean name, and using the `RetryTopicConfiguration` bean you can either provide the bean name or the instance itself.
674672

675673
====
@@ -703,6 +701,27 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
703701
----
704702
====
705703

704+
705+
IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.
706+
707+
If you need to revert the factory configuration behavior to prior 2.8.3, you can replace the standard `RetryTopicConfigurer` bean and set `useLegacyFactoryConfigurer` to `true`, such as:
708+
709+
====
710+
[source, java]
711+
----
712+
713+
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
714+
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
715+
ListenerContainerFactoryResolver containerFactoryResolver,
716+
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
717+
BeanFactory beanFactory,
718+
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
719+
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
720+
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
721+
return retryTopicConfigurer;
722+
}
723+
----
724+
706725
[[change-kboe-logging-level]]
707726
==== Changing KafkaBackOffException Logging Level
708727

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@
2323
import java.util.List;
2424
import java.util.Set;
2525
import java.util.function.Consumer;
26+
import java.util.regex.Pattern;
2627

2728
import org.apache.commons.logging.LogFactory;
2829

2930
import org.springframework.beans.factory.annotation.Qualifier;
3031
import org.springframework.core.log.LogAccessor;
3132
import org.springframework.kafka.KafkaException;
3233
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
34+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
35+
import org.springframework.kafka.config.KafkaListenerEndpoint;
3336
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
3437
import org.springframework.kafka.listener.CommonErrorHandler;
3538
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@@ -38,18 +41,23 @@
3841
import org.springframework.kafka.listener.DefaultErrorHandler;
3942
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
4043
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
44+
import org.springframework.kafka.support.TopicPartitionOffset;
4145
import org.springframework.util.Assert;
4246
import org.springframework.util.backoff.FixedBackOff;
4347

4448
/**
4549
*
46-
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory} with a
47-
* {@link DefaultErrorHandler}, the {@link DeadLetterPublishingRecoverer} created by
48-
* the {@link DeadLetterPublishingRecovererFactory}.
50+
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory} to add a
51+
* {@link DefaultErrorHandler} and the {@link DeadLetterPublishingRecoverer}
52+
* created by the {@link DeadLetterPublishingRecovererFactory}.
4953
*
50-
* Mind that the same factory can be used by many different
51-
* {@link org.springframework.kafka.annotation.RetryableTopic}s but should not be shared
52-
* with non retryable topics as some of their configurations will be overriden.
54+
* Also sets {@link ContainerProperties#setIdlePartitionEventInterval(Long)}
55+
* and {@link ContainerProperties#setPollTimeout(long)} if its defaults haven't
56+
* been overridden by the user.
57+
*
58+
* Since 2.8.3 these configurations don't interfere with the provided factory
59+
* instance itself, so the same factory instance can be shared among retryable and
60+
* non-retryable endpoints.
5361
*
5462
* @author Tomaz Fernandes
5563
* @since 2.7
@@ -95,20 +103,62 @@ public class ListenerContainerFactoryConfigurer {
95103
this.clock = clock;
96104
}
97105

106+
/**
107+
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
108+
* @param containerFactory the factory instance to be configured.
109+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
110+
* @return the configured factory instance.
111+
* @deprecated in favor of
112+
* {@link #decorateFactory(ConcurrentKafkaListenerContainerFactory, Configuration)}.
113+
*/
114+
@Deprecated
98115
public ConcurrentKafkaListenerContainerFactory<?, ?> configure(
99116
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
100117
return isCached(containerFactory)
101118
? containerFactory
102119
: addToCache(doConfigure(containerFactory, configuration.backOffValues));
103120
}
104121

122+
/**
123+
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
124+
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
125+
* @param containerFactory the factory instance to be configured.
126+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
127+
* @return the configured factory instance.
128+
* @deprecated in favor of
129+
* {@link #decorateFactoryWithoutBackOffValues(ConcurrentKafkaListenerContainerFactory, Configuration)}.
130+
*/
131+
@Deprecated
105132
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(
106133
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
107134
return isCached(containerFactory)
108135
? containerFactory
109136
: doConfigure(containerFactory, Collections.emptyList());
110137
}
111138

139+
/**
140+
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
141+
* @param factory the factory instance to be decorated.
142+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
143+
* @return the decorated factory instance.
144+
*/
145+
public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerContainerFactory<?, ?> factory,
146+
Configuration configuration) {
147+
return new RetryTopicListenerContainerFactoryDecorator(factory, configuration.backOffValues);
148+
}
149+
150+
/**
151+
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
152+
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
153+
* @param factory the factory instance to be decorated.
154+
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
155+
* @return the decorated factory instance.
156+
*/
157+
public KafkaListenerContainerFactory<?> decorateFactoryWithoutBackOffValues(
158+
ConcurrentKafkaListenerContainerFactory<?, ?> factory, Configuration configuration) {
159+
return new RetryTopicListenerContainerFactoryDecorator(factory, Collections.emptyList());
160+
}
161+
112162
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
113163
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, List<Long> backOffValues) {
114164

@@ -217,6 +267,45 @@ private <T> T checkAndCast(Object obj, Class<T> clazz) {
217267
return (T) obj;
218268
}
219269

270+
private class RetryTopicListenerContainerFactoryDecorator implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {
271+
272+
private final ConcurrentKafkaListenerContainerFactory<?, ?> delegate;
273+
private final List<Long> backOffValues;
274+
275+
RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate, List<Long> backOffValues) {
276+
this.delegate = delegate;
277+
this.backOffValues = backOffValues;
278+
}
279+
280+
@Override
281+
public ConcurrentMessageListenerContainer<?, ?> createListenerContainer(KafkaListenerEndpoint endpoint) {
282+
return decorate(this.delegate.createListenerContainer(endpoint));
283+
}
284+
285+
private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
286+
setupBackoffAwareMessageListenerAdapter(listenerContainer, this.backOffValues);
287+
listenerContainer
288+
.setCommonErrorHandler(createErrorHandler(
289+
ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create()));
290+
return listenerContainer;
291+
}
292+
293+
@Override
294+
public ConcurrentMessageListenerContainer<?, ?> createContainer(TopicPartitionOffset... topicPartitions) {
295+
return decorate(this.delegate.createContainer(topicPartitions));
296+
}
297+
298+
@Override
299+
public ConcurrentMessageListenerContainer<?, ?> createContainer(String... topics) {
300+
return decorate(this.delegate.createContainer(topics));
301+
}
302+
303+
@Override
304+
public ConcurrentMessageListenerContainer<?, ?> createContainer(Pattern topicPattern) {
305+
return decorate(this.delegate.createContainer(topicPattern));
306+
}
307+
}
308+
220309
static class Configuration {
221310

222311
private final List<Long> backOffValues;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ public class RetryTopicConfigurer {
220220

221221
private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
222222

223+
private boolean useLegacyFactoryConfigurer = false;
224+
223225
/**
224226
* Create an instance with the provided properties.
225227
* @param destinationTopicProcessor the destination topic processor.
@@ -297,7 +299,7 @@ private void processAndRegisterEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEn
297299
RetryTopicConfiguration configuration, DestinationTopicProcessor.Context context,
298300
DestinationTopic.Properties destinationTopicProperties) {
299301

300-
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
302+
KafkaListenerContainerFactory<?> resolvedFactory =
301303
destinationTopicProperties.isMainEndpoint()
302304
? resolveAndConfigureFactoryForMainEndpoint(factory, defaultFactoryBeanName, configuration)
303305
: resolveAndConfigureFactoryForRetryEndpoint(factory, defaultFactoryBeanName, configuration);
@@ -360,25 +362,32 @@ private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandl
360362
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
361363
}
362364

363-
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForMainEndpoint(
365+
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
364366
KafkaListenerContainerFactory<?> providedFactory,
365367
String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
366368
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver
367369
.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName,
368370
configuration.forContainerFactoryResolver());
369-
return this.listenerContainerFactoryConfigurer
370-
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
371+
372+
return this.useLegacyFactoryConfigurer
373+
? this.listenerContainerFactoryConfigurer
374+
.configureWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer())
375+
: this.listenerContainerFactoryConfigurer
376+
.decorateFactoryWithoutBackOffValues(resolvedFactory, configuration.forContainerFactoryConfigurer());
371377
}
372378

373-
private ConcurrentKafkaListenerContainerFactory<?, ?> resolveAndConfigureFactoryForRetryEndpoint(
379+
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpoint(
374380
KafkaListenerContainerFactory<?> providedFactory,
375381
String defaultFactoryBeanName,
376382
RetryTopicConfiguration configuration) {
377383
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
378384
this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName,
379385
configuration.forContainerFactoryResolver());
380-
return this.listenerContainerFactoryConfigurer
381-
.configure(resolvedFactory, configuration.forContainerFactoryConfigurer());
386+
return this.useLegacyFactoryConfigurer
387+
? this.listenerContainerFactoryConfigurer.configure(resolvedFactory,
388+
configuration.forContainerFactoryConfigurer())
389+
: this.listenerContainerFactoryConfigurer
390+
.decorateFactory(resolvedFactory, configuration.forContainerFactoryConfigurer());
382391
}
383392

384393
private void throwIfMultiMethodEndpoint(MethodKafkaListenerEndpoint<?, ?> mainEndpoint) {
@@ -395,6 +404,17 @@ public static EndpointHandlerMethod createHandlerMethodWith(Object bean, Method
395404
return new EndpointHandlerMethod(bean, method);
396405
}
397406

407+
/**
408+
* Set to true if you want the {@link ListenerContainerFactoryConfigurer} to
409+
* behave as before 2.8.3.
410+
* @param useLegacyFactoryConfigurer Whether to use the legacy factory configuration.
411+
* @deprecated for removal after the deprecated legacy configuration methods are removed.
412+
*/
413+
@Deprecated
414+
public void useLegacyFactoryConfigurer(boolean useLegacyFactoryConfigurer) {
415+
this.useLegacyFactoryConfigurer = useLegacyFactoryConfigurer;
416+
}
417+
398418
public interface EndpointProcessor extends Consumer<MethodKafkaListenerEndpoint<?, ?>> {
399419

400420
default void process(MethodKafkaListenerEndpoint<?, ?> listenerEndpoint) {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import static org.mockito.ArgumentMatchers.eq;
2323
import static org.mockito.BDDMockito.given;
2424
import static org.mockito.BDDMockito.then;
25+
import static org.mockito.BDDMockito.willReturn;
2526
import static org.mockito.Mockito.never;
2627
import static org.mockito.Mockito.times;
2728

@@ -46,6 +47,8 @@
4647

4748
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
4849
import org.springframework.kafka.config.ContainerCustomizer;
50+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
51+
import org.springframework.kafka.config.KafkaListenerEndpoint;
4952
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
5053
import org.springframework.kafka.listener.CommonErrorHandler;
5154
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@@ -132,6 +135,9 @@ class ListenerContainerFactoryConfigurerTests {
132135
@Mock
133136
private RetryTopicConfiguration configuration;
134137

138+
@Mock
139+
private KafkaListenerEndpoint endpoint;
140+
135141
private final long backOffValue = 2000L;
136142

137143
private final ListenerContainerFactoryConfigurer.Configuration lcfcConfiguration =
@@ -360,6 +366,44 @@ void shouldSetupMessageListenerAdapter() {
360366
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
361367
}
362368

369+
@Test
370+
void shouldDecorateFactory() {
371+
372+
// given
373+
given(container.getContainerProperties()).willReturn(containerProperties);
374+
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
375+
given(containerProperties.getMessageListener()).willReturn(listener);
376+
RecordHeaders headers = new RecordHeaders();
377+
headers.add(RetryTopicHeaders.DEFAULT_HEADER_BACKOFF_TIMESTAMP, originalTimestampBytes);
378+
given(data.headers()).willReturn(headers);
379+
String testListenerId = "testListenerId";
380+
given(container.getListenerId()).willReturn(testListenerId);
381+
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
382+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
383+
384+
// when
385+
ListenerContainerFactoryConfigurer configurer =
386+
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
387+
deadLetterPublishingRecovererFactory, clock);
388+
configurer.setContainerCustomizer(configurerContainerCustomizer);
389+
KafkaListenerContainerFactory<?> factory = configurer
390+
.decorateFactory(containerFactory, configuration.forContainerFactoryConfigurer());
391+
factory.createListenerContainer(endpoint);
392+
393+
// then
394+
then(container).should(times(1)).setupMessageListener(listenerAdapterCaptor.capture());
395+
KafkaBackoffAwareMessageListenerAdapter<?, ?> listenerAdapter =
396+
(KafkaBackoffAwareMessageListenerAdapter<?, ?>) listenerAdapterCaptor.getValue();
397+
listenerAdapter.onMessage(data, ack, consumer);
398+
399+
then(this.kafkaConsumerBackoffManager).should(times(1))
400+
.createContext(anyLong(), listenerIdCaptor.capture(), any(TopicPartition.class), eq(consumer));
401+
assertThat(listenerIdCaptor.getValue()).isEqualTo(testListenerId);
402+
then(listener).should(times(1)).onMessage(data, ack, consumer);
403+
404+
then(this.configurerContainerCustomizer).should(times(1)).accept(container);
405+
}
406+
363407
@Test
364408
void shouldCacheFactoryInstances() {
365409

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -224,9 +224,9 @@ void shouldConfigureRetryEndpoints() {
224224

225225
willReturn(containerFactory).given(containerFactoryResolver).resolveFactoryForRetryEndpoint(containerFactory,
226226
defaultFactoryBeanName, factoryResolverConfig);
227-
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configure(containerFactory,
227+
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactory(containerFactory,
228228
lcfcConfiguration);
229-
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).configureWithoutBackOffValues(containerFactory,
229+
willReturn(containerFactory).given(this.listenerContainerFactoryConfigurer).decorateFactoryWithoutBackOffValues(containerFactory,
230230
lcfcConfiguration);
231231

232232
RetryTopicConfigurer configurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver,

0 commit comments

Comments
 (0)