Skip to content

Commit 9f869b6

Browse files
committed
Remove legacy code.
Also fix unrelated race in EKIT. Only allow one `RetryTemplateConfigurationSupport` bean.
1 parent 9acdbde commit 9f869b6

29 files changed

+254
-2032
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@
5858
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
5959
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
6060
import org.springframework.beans.factory.config.Scope;
61-
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
62-
import org.springframework.beans.factory.support.RootBeanDefinition;
6361
import org.springframework.context.ApplicationContext;
6462
import org.springframework.context.ApplicationContextAware;
6563
import org.springframework.context.ConfigurableApplicationContext;
@@ -510,29 +508,11 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
510508
}
511509

512510
private RetryTopicConfigurer getRetryTopicConfigurer() {
513-
bootstrapRetryTopicIfNecessary();
514511
return this.beanFactory.containsBean("internalRetryTopicConfigurer")
515512
? this.beanFactory.getBean("internalRetryTopicConfigurer", RetryTopicConfigurer.class)
516513
: this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
517514
}
518515

519-
@SuppressWarnings("deprecation")
520-
private void bootstrapRetryTopicIfNecessary() {
521-
if (!(this.beanFactory instanceof BeanDefinitionRegistry)) {
522-
throw new IllegalStateException("BeanFactory must be an instance of "
523-
+ BeanDefinitionRegistry.class.getSimpleName()
524-
+ " to bootstrap the RetryTopic functionality. Provided beanFactory: "
525-
+ this.beanFactory.getClass().getSimpleName());
526-
}
527-
BeanDefinitionRegistry registry = (BeanDefinitionRegistry) this.beanFactory;
528-
if (!registry.containsBeanDefinition("internalRetryTopicBootstrapper")) {
529-
registry.registerBeanDefinition("internalRetryTopicBootstrapper",
530-
new RootBeanDefinition(org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class));
531-
this.beanFactory.getBean("internalRetryTopicBootstrapper",
532-
org.springframework.kafka.retrytopic.RetryTopicBootstrapper.class).bootstrapRetryTopic();
533-
}
534-
}
535-
536516
private Method checkProxy(Method methodArg, Object bean) {
537517
Method method = methodArg;
538518
if (AopUtils.isJdkDynamicProxy(bean)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
* Base class for {@link KafkaBackOffManagerFactory} implementations.
2626
*
2727
* @author Tomaz Fernandes
28+
* @author Gary Russell
2829
* @since 2.7
2930
* @see KafkaConsumerBackoffManager
3031
*/
@@ -35,24 +36,23 @@ public abstract class AbstractKafkaBackOffManagerFactory
3536

3637
private ListenerContainerRegistry listenerContainerRegistry;
3738

39+
/**
40+
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
41+
* the {@link ApplicationContext}.
42+
*/
43+
public AbstractKafkaBackOffManagerFactory() {
44+
this.listenerContainerRegistry = null;
45+
}
46+
3847
/**
3948
* Creates an instance with the provided {@link ListenerContainerRegistry},
4049
* which will be used to fetch the {@link MessageListenerContainer} to back off.
41-
4250
* @param listenerContainerRegistry the listenerContainerRegistry to use.
4351
*/
4452
public AbstractKafkaBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
4553
this.listenerContainerRegistry = listenerContainerRegistry;
4654
}
4755

48-
/**
49-
* Creates an instance that will retrieve the {@link ListenerContainerRegistry} from
50-
* the {@link ApplicationContext}.
51-
*/
52-
public AbstractKafkaBackOffManagerFactory() {
53-
this.listenerContainerRegistry = null;
54-
}
55-
5656
/**
5757
* Sets the {@link ListenerContainerRegistry}, that will be used to fetch the
5858
* {@link MessageListenerContainer} to back off.

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ public class ContainerPartitionPausingBackOffManager implements KafkaConsumerBac
4545

4646
/**
4747
* Construct an instance with the provided registry and back off handler.
48-
* @param listenerContainerRegistry
49-
* @param backOffHandler
48+
* @param listenerContainerRegistry the registry.
49+
* @param backOffHandler the handler.
5050
*/
5151
public ContainerPartitionPausingBackOffManager(ListenerContainerRegistry listenerContainerRegistry,
5252
BackOffHandler backOffHandler) {

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManagerFactory.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.springframework.util.Assert;
20+
1921
/**
2022
* A factory for {@link ContainerPartitionPausingBackoffManager}.
2123
*
@@ -30,18 +32,21 @@ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafk
3032
/**
3133
* Construct an instance with the provided properties.
3234
* @param listenerContainerRegistry the registry.
33-
* @param backOffHandler the back off handler.
3435
*/
3536
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
36-
3737
super(listenerContainerRegistry);
3838
}
3939

4040
@Override
4141
protected KafkaConsumerBackoffManager doCreateManager(ListenerContainerRegistry registry) {
42+
Assert.notNull(this.backOffHandler, "a BackOffHandler is required");
4243
return new ContainerPartitionPausingBackOffManager(getListenerContainerRegistry(), this.backOffHandler);
4344
}
4445

46+
/**
47+
* Set the back off handler to use in the created handlers.
48+
* @param backOffHandler the handler.
49+
*/
4550
public void setBackOffHandler(BackOffHandler backOffHandler) {
4651
this.backOffHandler = backOffHandler;
4752
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerTimingAdjuster.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java

Lines changed: 0 additions & 177 deletions
This file was deleted.

0 commit comments

Comments
 (0)