Skip to content

GH-2239: Fix Boot AutoConfiguration #2343

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.OrderComparator;
import org.springframework.core.Ordered;
Expand All @@ -83,11 +84,15 @@
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerGroupSequencer;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
import org.springframework.kafka.retrytopic.RetryTopicBeanNames;
import org.springframework.kafka.retrytopic.RetryTopicConfiguration;
import org.springframework.kafka.retrytopic.RetryTopicConfigurationSupport;
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.GenericMessageConverter;
Expand All @@ -96,6 +101,8 @@
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -516,14 +523,50 @@ private RetryTopicConfigurer getRetryTopicConfigurer() {
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}
catch (NoSuchBeanDefinitionException ex) {
this.logger.error("A 'RetryTopicConfigurer' with name "
+ RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required.");
throw ex;
this.retryTopicConfigurer = createDefaultConfigurer();
}
}
return this.retryTopicConfigurer;
}

private RetryTopicConfigurer createDefaultConfigurer() {
if (this.applicationContext instanceof GenericApplicationContext) {
GenericApplicationContext gac = (GenericApplicationContext) this.applicationContext;
gac.registerBean(
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
RetryTopicConfigurationSupport.class,
() -> new RetryTopicConfigurationSupport());
RetryTopicConfigurationSupport rtcs = this.applicationContext.getBean(
RetryTopicBeanNames.DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME,
RetryTopicConfigurationSupport.class);
DestinationTopicResolver destResolver = rtcs.destinationTopicResolver();
RetryTopicSchedulerWrapper schedW = gac.getBeanProvider(RetryTopicSchedulerWrapper.class).getIfUnique();
TaskScheduler sched = gac.getBeanProvider(TaskScheduler.class).getIfUnique();
if (schedW == null && sched == null) {
RetryTopicSchedulerWrapper newSchedW = new RetryTopicSchedulerWrapper(new ThreadPoolTaskScheduler());
gac.registerBean(RetryTopicBeanNames.DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME,
RetryTopicSchedulerWrapper.class, () -> newSchedW);
schedW = gac.getBean(RetryTopicSchedulerWrapper.class);
}
KafkaConsumerBackoffManager bom =
rtcs.kafkaConsumerBackoffManager(this.applicationContext, this.registrar.getEndpointRegistry(),
schedW, sched);
RetryTopicConfigurer rtc = rtcs.retryTopicConfigurer(bom, destResolver, this.beanFactory);

gac.registerBean(RetryTopicBeanNames.DESTINATION_TOPIC_RESOLVER_BEAN_NAME, DestinationTopicResolver.class,
() -> destResolver);
gac.registerBean(KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME,
KafkaConsumerBackoffManager.class, () -> bom);
gac.registerBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class,
() -> rtc);

return this.beanFactory
.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME, RetryTopicConfigurer.class);
}
throw new IllegalStateException("When there is no RetryTopicConfigurationSupport bean, the application context "
+ "must be a GenericApplicationContext");
}

private Method checkProxy(Method methodArg, Object bean) {
Method method = methodArg;
if (AopUtils.isJdkDynamicProxy(bean)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
public final void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.springframework.kafka.listener;

import org.springframework.context.ApplicationContext;
import org.springframework.util.Assert;

/**
Expand All @@ -32,9 +33,13 @@ public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafk
/**
* Construct an instance with the provided properties.
* @param listenerContainerRegistry the registry.
* @param applicationContext the application context.
*/
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry,
ApplicationContext applicationContext) {

super(listenerContainerRegistry);
setApplicationContext(applicationContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
/**
* The bean names for the non-blocking topic-based delayed retries feature.
* @author Tomaz Fernandes
* @author Gary Russell
* @since 2.9
*/
public final class RetryTopicBeanNames {

private RetryTopicBeanNames() {
}

/**
* The bean name of an internally managed retry topic configuration support, if
* needed.
*/
public static final String DEFAULT_RETRY_TOPIC_CONFIG_SUPPORT_BEAN_NAME =
"org.springframework.kafka.retrytopic.internalRetryTopicConfigurationSupport";

/**
* The bean name of the internally managed retry topic configurer.
*/
Expand All @@ -50,4 +58,10 @@ private RetryTopicBeanNames() {
public static final String DEFAULT_KAFKA_TEMPLATE_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";

/**
* The bean name of the internally registered scheduler wrapper, if needed.
*/
public static final String DEFAULT_SCHEDULER_WRAPPER_BEAN_NAME =
"defaultRetryTopicKafkaTemplate";

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Clock;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
Expand Down Expand Up @@ -150,10 +151,13 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
* {@link KafkaConsumerBackoffManager} instance used to back off the partitions.
* @param registry the {@link ListenerContainerRegistry} used to fetch the
* {@link MessageListenerContainer}.
* @param applicationContext the application context.
* @return the instance.
*/
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
return new ContainerPartitionPausingBackOffManagerFactory(registry);
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry,
ApplicationContext applicationContext) {

return new ContainerPartitionPausingBackOffManagerFactory(registry, applicationContext);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class RetryTopicConfigurationSupport {

private final RetryTopicComponentFactory componentFactory = createComponentFactory();

protected RetryTopicConfigurationSupport() {
public RetryTopicConfigurationSupport() {
Assert.state(ONLY_ONE_ALLOWED.getAndSet(false), "Only one 'RetryTopicConfigurationSupport' is allowed");
}

Expand Down Expand Up @@ -266,20 +267,21 @@ protected Consumer<DestinationTopicResolver> configureDestinationTopicResolver()
* To provide a custom implementation, either override this method, or
* override the {@link RetryTopicComponentFactory#kafkaBackOffManagerFactory} method
* and return a different {@link KafkaBackOffManagerFactory}.
* @param applicationContext the application context.
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
* {@link MessageListenerContainer} at runtime to be backed off.
* @param wrapper a {@link RetryTopicSchedulerWrapper}.
* @param taskScheduler a {@link TaskScheduler}.
* @return the instance.
*/
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(ApplicationContext applicationContext,
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
ListenerContainerRegistry registry, @Nullable RetryTopicSchedulerWrapper wrapper,
@Nullable TaskScheduler taskScheduler) {

KafkaBackOffManagerFactory backOffManagerFactory =
this.componentFactory.kafkaBackOffManagerFactory(registry);
this.componentFactory.kafkaBackOffManagerFactory(registry, applicationContext);
JavaUtils.INSTANCE.acceptIfInstanceOf(ContainerPartitionPausingBackOffManagerFactory.class, backOffManagerFactory,
factory -> configurePartitionPausingFactory(factory, registry,
wrapper != null ? wrapper.getScheduler() : taskScheduler));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.mockito.ArgumentCaptor;

import org.springframework.beans.factory.BeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
Expand Down Expand Up @@ -167,7 +168,8 @@ void testCreateBackOffManager() {
KafkaConsumerBackoffManager backoffManagerMock = mock(KafkaConsumerBackoffManager.class);
TaskScheduler taskSchedulerMock = mock(TaskScheduler.class);
Clock clock = mock(Clock.class);
given(componentFactory.kafkaBackOffManagerFactory(registry)).willReturn(factory);
ApplicationContext ctx = mock(ApplicationContext.class);
given(componentFactory.kafkaBackOffManagerFactory(registry, ctx)).willReturn(factory);
given(factory.create()).willReturn(backoffManagerMock);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {

Expand All @@ -177,19 +179,21 @@ protected RetryTopicComponentFactory createComponentFactory() {
}

};
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null,
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
taskSchedulerMock);
assertThat(backoffManager).isEqualTo(backoffManagerMock);
then(componentFactory).should().kafkaBackOffManagerFactory(registry);
then(componentFactory).should().kafkaBackOffManagerFactory(registry, ctx);
then(factory).should().create();
}

@Test
void testCreateBackOffManagerNoConfiguration() {
ListenerContainerRegistry registry = mock(ListenerContainerRegistry.class);
TaskScheduler scheduler = mock(TaskScheduler.class);
ApplicationContext ctx = mock(ApplicationContext.class);
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport();
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(registry, null, scheduler);
KafkaConsumerBackoffManager backoffManager = support.kafkaConsumerBackoffManager(ctx, registry, null,
scheduler);
assertThat(backoffManager).isNotNull();
}

Expand Down