Skip to content

Commit ffa3219

Browse files
committed
Remove more dead/deprecated code.
1 parent f5e7f2b commit ffa3219

File tree

9 files changed

+29
-317
lines changed

9 files changed

+29
-317
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -508,9 +508,8 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
508508
}
509509

510510
private RetryTopicConfigurer getRetryTopicConfigurer() {
511-
return this.beanFactory.containsBean("internalRetryTopicConfigurer")
512-
? this.beanFactory.getBean("internalRetryTopicConfigurer", RetryTopicConfigurer.class)
513-
: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
511+
return this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME,
512+
RetryTopicConfigurer.class);
514513
}
515514

516515
private Method checkProxy(Method methodArg, Object bean) {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -214,25 +214,17 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
214214
}
215215
}
216216
try {
217-
return this.beanFactory.getBean(
218-
org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
217+
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
219218
KafkaOperations.class);
220219
}
221-
catch (NoSuchBeanDefinitionException ex) {
220+
catch (NoSuchBeanDefinitionException ex2) {
222221
try {
223-
return this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME,
224-
KafkaOperations.class);
222+
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
225223
}
226-
catch (NoSuchBeanDefinitionException ex2) {
227-
try {
228-
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
229-
}
230-
catch (NoSuchBeanDefinitionException exc) {
231-
exc.addSuppressed(ex);
232-
exc.addSuppressed(ex2);
233-
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
234-
exc);
235-
}
224+
catch (NoSuchBeanDefinitionException exc) {
225+
exc.addSuppressed(ex2);
226+
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
227+
exc);
236228
}
237229
}
238230
}

spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ public interface BackOffHandler {
3636
* @param exception the exception.
3737
* @param nextBackOff the next back off.
3838
*/
39-
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);
39+
default void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff) {
40+
throw new UnsupportedOperationException();
41+
}
4042

4143
/**
4244
* Perform the next back off for a partition.

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 1 addition & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,12 @@
1818

1919
import java.time.Clock;
2020
import java.util.Arrays;
21-
import java.util.HashSet;
2221
import java.util.List;
23-
import java.util.Set;
2422
import java.util.function.Consumer;
2523
import java.util.regex.Pattern;
2624

2725
import org.apache.commons.logging.LogFactory;
2826

29-
import org.springframework.beans.factory.annotation.Qualifier;
3027
import org.springframework.core.log.LogAccessor;
3128
import org.springframework.kafka.KafkaException;
3229
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
@@ -64,15 +61,9 @@
6461
*/
6562
public class ListenerContainerFactoryConfigurer {
6663

67-
private static final Set<ConcurrentKafkaListenerContainerFactory<?, ?>> CONFIGURED_FACTORIES_CACHE;
68-
6964
private static final LogAccessor LOGGER = new LogAccessor(
7065
LogFactory.getLog(ListenerContainerFactoryConfigurer.class));
7166

72-
static {
73-
CONFIGURED_FACTORIES_CACHE = new HashSet<>();
74-
}
75-
7667
private BackOff providedBlockingBackOff = null;
7768

7869
private Class<? extends Exception>[] blockingExceptionTypes = null;
@@ -91,45 +82,12 @@ public class ListenerContainerFactoryConfigurer {
9182

9283
public ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
9384
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
94-
@Qualifier("internalBackOffClock") Clock clock) {
85+
Clock clock) {
9586
this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
9687
this.deadLetterPublishingRecovererFactory = deadLetterPublishingRecovererFactory;
9788
this.clock = clock;
9889
}
9990

100-
/**
101-
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
102-
* @param containerFactory the factory instance to be configured.
103-
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
104-
* @return the configured factory instance.
105-
* @deprecated in favor of
106-
* {@link #decorateFactory(ConcurrentKafkaListenerContainerFactory, Configuration)}.
107-
*/
108-
@Deprecated
109-
public ConcurrentKafkaListenerContainerFactory<?, ?> configure(
110-
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
111-
return isCached(containerFactory)
112-
? containerFactory
113-
: addToCache(doConfigure(containerFactory, configuration, true));
114-
}
115-
116-
/**
117-
* Configures the provided {@link ConcurrentKafkaListenerContainerFactory}.
118-
* Meant to be used for the main endpoint, this method ignores the provided backOff values.
119-
* @param containerFactory the factory instance to be configured.
120-
* @param configuration the configuration provided by the {@link RetryTopicConfiguration}.
121-
* @return the configured factory instance.
122-
* @deprecated in favor of
123-
* {@link #decorateFactoryWithoutSettingContainerProperties(ConcurrentKafkaListenerContainerFactory, Configuration)}.
124-
*/
125-
@Deprecated
126-
public ConcurrentKafkaListenerContainerFactory<?, ?> configureWithoutBackOffValues(
127-
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration) {
128-
return isCached(containerFactory)
129-
? containerFactory
130-
: doConfigure(containerFactory, configuration, false);
131-
}
132-
13391
/**
13492
* Decorates the provided {@link ConcurrentKafkaListenerContainerFactory}.
13593
* @param factory the factory instance to be decorated.
@@ -189,30 +147,6 @@ public final void setBlockingRetryableExceptions(Class<? extends Exception>... e
189147
this.blockingExceptionTypes = Arrays.copyOf(exceptionTypes, exceptionTypes.length);
190148
}
191149

192-
private ConcurrentKafkaListenerContainerFactory<?, ?> doConfigure(
193-
ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory, Configuration configuration,
194-
boolean isSetContainerProperties) {
195-
196-
containerFactory
197-
.setContainerCustomizer(container -> setupBackoffAwareMessageListenerAdapter(container, configuration, isSetContainerProperties));
198-
containerFactory
199-
.setCommonErrorHandler(createErrorHandler(this.deadLetterPublishingRecovererFactory.create(), configuration));
200-
return containerFactory;
201-
}
202-
203-
private boolean isCached(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory) {
204-
synchronized (CONFIGURED_FACTORIES_CACHE) {
205-
return CONFIGURED_FACTORIES_CACHE.contains(containerFactory);
206-
}
207-
}
208-
209-
private ConcurrentKafkaListenerContainerFactory<?, ?> addToCache(ConcurrentKafkaListenerContainerFactory<?, ?> containerFactory) {
210-
synchronized (CONFIGURED_FACTORIES_CACHE) {
211-
CONFIGURED_FACTORIES_CACHE.add(containerFactory);
212-
return containerFactory;
213-
}
214-
}
215-
216150
public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
217151
Assert.notNull(containerCustomizer, "'containerCustomizer' cannot be null");
218152
this.containerCustomizer = containerCustomizer;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,10 +383,10 @@ private EndpointHandlerMethod getDltEndpointHandlerMethodOrDefault(EndpointHandl
383383
return dltEndpointHandlerMethod != null ? dltEndpointHandlerMethod : DEFAULT_DLT_HANDLER;
384384
}
385385

386-
@SuppressWarnings("deprecation")
387386
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoint(
388387
KafkaListenerContainerFactory<?> providedFactory,
389388
String defaultFactoryBeanName, RetryTopicConfiguration configuration) {
389+
390390
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory = this.containerFactoryResolver
391391
.resolveFactoryForMainEndpoint(providedFactory, defaultFactoryBeanName,
392392
configuration.forContainerFactoryResolver());
@@ -396,11 +396,11 @@ private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForMainEndpoi
396396
configuration.forContainerFactoryConfigurer());
397397
}
398398

399-
@SuppressWarnings("deprecation")
400399
private KafkaListenerContainerFactory<?> resolveAndConfigureFactoryForRetryEndpoint(
401400
KafkaListenerContainerFactory<?> providedFactory,
402401
String defaultFactoryBeanName,
403402
RetryTopicConfiguration configuration) {
403+
404404
ConcurrentKafkaListenerContainerFactory<?, ?> resolvedFactory =
405405
this.containerFactoryResolver.resolveFactoryForRetryEndpoint(providedFactory, defaultFactoryBeanName,
406406
configuration.forContainerFactoryResolver());

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicInternalBeanNames.java

Lines changed: 0 additions & 117 deletions
This file was deleted.

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -158,18 +158,21 @@ void shouldSetupErrorHandling() {
158158
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
159159
given(containerProperties.getAckMode()).willReturn(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
160160
given(containerProperties.getCommitCallback()).willReturn(offsetCommitCallback);
161+
given(containerProperties.getMessageListener()).willReturn(listener);
161162
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
163+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
162164

163165
// when
164166
ListenerContainerFactoryConfigurer configurer =
165167
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
166168
deadLetterPublishingRecovererFactory, clock);
167169
configurer.setErrorHandlerCustomizer(errorHandlerCustomizer);
168-
configurer
169-
.configure(containerFactory, configuration.forContainerFactoryConfigurer());
170+
KafkaListenerContainerFactory<?> factory = configurer.decorateFactory(containerFactory,
171+
configuration.forContainerFactoryConfigurer());
172+
factory.createListenerContainer(endpoint);
170173

171174
// then
172-
then(containerFactory).should(times(1)).setCommonErrorHandler(errorHandlerCaptor.capture());
175+
then(container).should(times(1)).setCommonErrorHandler(errorHandlerCaptor.capture());
173176
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
174177
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
175178
DefaultErrorHandler seekToCurrent = (DefaultErrorHandler) errorHandler;
@@ -196,22 +199,18 @@ void shouldSetupMessageListenerAdapter() {
196199
String testListenerId = "testListenerId";
197200
given(container.getListenerId()).willReturn(testListenerId);
198201
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
202+
willReturn(container).given(containerFactory).createListenerContainer(endpoint);
199203

200204
// when
201205
ListenerContainerFactoryConfigurer configurer =
202206
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
203207
deadLetterPublishingRecovererFactory, clock);
204208
configurer.setContainerCustomizer(configurerContainerCustomizer);
205-
ConcurrentKafkaListenerContainerFactory<?, ?> factory = configurer
206-
.configure(containerFactory, configuration.forContainerFactoryConfigurer());
209+
KafkaListenerContainerFactory<?> factory = configurer
210+
.decorateFactory(containerFactory, configuration.forContainerFactoryConfigurer());
211+
factory.createListenerContainer(endpoint);
207212

208213
// then
209-
then(containerFactory)
210-
.should(times(1))
211-
.setContainerCustomizer(containerCustomizerCaptor.capture());
212-
ContainerCustomizer containerCustomizer = containerCustomizerCaptor.getValue();
213-
containerCustomizer.configure(container);
214-
215214
then(container).should(times(1)).setupMessageListener(listenerAdapterCaptor.capture());
216215
KafkaBackoffAwareMessageListenerAdapter<?, ?> listenerAdapter =
217216
(KafkaBackoffAwareMessageListenerAdapter<?, ?>) listenerAdapterCaptor.getValue();
@@ -315,26 +314,4 @@ void shouldThrowIfBackOffOrRetryablesAlreadySet() {
315314
.isInstanceOf(IllegalStateException.class);
316315
}
317316

318-
319-
@Test
320-
void shouldCacheFactoryInstances() {
321-
322-
// given
323-
given(deadLetterPublishingRecovererFactory.create()).willReturn(recoverer);
324-
given(configuration.forContainerFactoryConfigurer()).willReturn(lcfcConfiguration);
325-
326-
// when
327-
ListenerContainerFactoryConfigurer configurer =
328-
new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager,
329-
deadLetterPublishingRecovererFactory, clock);
330-
ConcurrentKafkaListenerContainerFactory<?, ?> factory = configurer
331-
.configure(containerFactory, configuration.forContainerFactoryConfigurer());
332-
ConcurrentKafkaListenerContainerFactory<?, ?> secondFactory = configurer
333-
.configure(containerFactory, configuration.forContainerFactoryConfigurer());
334-
335-
// then
336-
assertThat(secondFactory).isEqualTo(factory);
337-
then(containerFactory).should(times(1)).setContainerCustomizer(any());
338-
then(containerFactory).should(times(1)).setCommonErrorHandler(any());
339-
}
340317
}

0 commit comments

Comments
 (0)