Skip to content

Commit f9cbe4e

Browse files
committed
Address PR Comments.
1 parent 631d4d8 commit f9cbe4e

11 files changed

+50
-37
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@ That being said, for consumers handling a single partition the message's process
4141

4242
IMPORTANT: It is guaranteed that a message will never be processed before its due time.
4343

44-
===== Tuning the Delay Precision
45-
46-
Starting with version 2.9, it is no longer necessary to tune the precision because a task scheduler is used to resume the partition and wake the consumer, if necessary.
47-
4844
[[retry-config]]
4945
==== Configuration
5046

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,8 @@ public class KafkaListenerAnnotationBeanPostProcessor<K, V>
186186

187187
private AnnotationEnhancer enhancer;
188188

189+
private RetryTopicConfigurer retryTopicConfigurer;
190+
189191
@Override
190192
public int getOrder() {
191193
return LOWEST_PRECEDENCE;
@@ -508,8 +510,16 @@ private boolean processMainAndRetryListeners(KafkaListener kafkaListener, Object
508510
}
509511

510512
private RetryTopicConfigurer getRetryTopicConfigurer() {
511-
return this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME,
512-
RetryTopicConfigurer.class);
513+
try {
514+
this.retryTopicConfigurer = this.beanFactory.getBean(RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME,
515+
RetryTopicConfigurer.class);
516+
}
517+
catch (NoSuchBeanDefinitionException ex) {
518+
this.logger.error("A 'RetryTopicConfigurer' with name "
519+
+ RetryTopicBeanNames.RETRY_TOPIC_CONFIGURER_BEAN_NAME + "is required.");
520+
throw ex;
521+
}
522+
return this.retryTopicConfigurer;
513523
}
514524

515525
private Method checkProxy(Method methodArg, Object bean) {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public class RetryableTopicAnnotationProcessor {
7676

7777
private final BeanExpressionContext expressionContext;
7878

79-
private static final String DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME = "kafkaTemplate";
80-
8179
/**
8280
* Construct an instance using the provided parameters and default resolver,
8381
* expression context.
@@ -218,14 +216,11 @@ private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean
218216
KafkaOperations.class);
219217
}
220218
catch (NoSuchBeanDefinitionException ex2) {
221-
try {
222-
return this.beanFactory.getBean(DEFAULT_SPRING_BOOT_KAFKA_TEMPLATE_NAME, KafkaOperations.class);
223-
}
224-
catch (NoSuchBeanDefinitionException exc) {
225-
exc.addSuppressed(ex2);
226-
throw new BeanInitializationException("Could not find a KafkaTemplate to configure the retry topics.", // NOSONAR (lost stack trace)
227-
exc);
228-
}
219+
KafkaOperations<?, ?> kafkaOps = this.beanFactory.getBeanProvider(KafkaOperations.class).getIfUnique();
220+
Assert.state(kafkaOps != null, () -> "A single KafkaTemplate bean could not be found in the context; "
221+
+ " a single instance must exist, or one specifically named "
222+
+ RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME);
223+
return kafkaOps;
229224
}
230225
}
231226

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,5 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
9090
public void setApplicationContext(ApplicationContext applicationContext) {
9191
this.applicationContext = applicationContext;
9292
}
93+
9394
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPartitionPausingBackOffManager.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ public class ContainerPartitionPausingBackOffManager implements KafkaConsumerBac
5151
public ContainerPartitionPausingBackOffManager(ListenerContainerRegistry listenerContainerRegistry,
5252
BackOffHandler backOffHandler) {
5353

54+
Assert.notNull(listenerContainerRegistry, "'listenerContainerRegistry' cannot be null");
55+
Assert.notNull(backOffHandler, "'backOffHandler' cannot be null");
5456
this.listenerContainerRegistry = listenerContainerRegistry;
5557
this.backOffHandler = backOffHandler;
5658
}
@@ -71,11 +73,12 @@ public void backOffIfNecessary(Context context) {
7173

7274
private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException {
7375
TopicPartition topicPartition = context.getTopicPartition();
74-
getListenerContainerFromContext(context).pausePartition(topicPartition);
75-
this.backOffHandler.onNextBackOff(getListenerContainerFromContext(context), topicPartition, backOffTime);
76+
MessageListenerContainer container = getListenerContainerFromContext(context);
77+
container.pausePartition(topicPartition);
78+
this.backOffHandler.onNextBackOff(container, topicPartition, backOffTime);
7679
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
77-
"backing off for approx. %s millis.", context.getTopicPartition().partition(),
78-
context.getTopicPartition().topic(), backOffTime),
80+
"backing off for approx. %s millis.", topicPartition.partition(),
81+
topicPartition.topic(), backOffTime),
7982
topicPartition, context.getListenerId(), context.getDueTimestamp());
8083
}
8184

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class ListenerContainerFactoryConfigurer {
7171
private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {
7272
};
7373

74-
private Consumer<CommonErrorHandler> errorHandlerCustomizer = errorHandler -> {
74+
private Consumer<DefaultErrorHandler> errorHandlerCustomizer = errorHandler -> {
7575
};
7676

7777
private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory;
@@ -152,7 +152,7 @@ public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?
152152
this.containerCustomizer = containerCustomizer;
153153
}
154154

155-
public void setErrorHandlerCustomizer(Consumer<CommonErrorHandler> errorHandlerCustomizer) {
155+
public void setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
156156
this.errorHandlerCustomizer = errorHandlerCustomizer;
157157
}
158158

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ public BlockingRetriesConfigurer backOff(BackOff backoff) {
349349
*/
350350
public static class CustomizersConfigurer {
351351

352-
private Consumer<CommonErrorHandler> errorHandlerCustomizer;
352+
private Consumer<DefaultErrorHandler> errorHandlerCustomizer;
353353

354354
private Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer;
355355

@@ -362,7 +362,7 @@ public static class CustomizersConfigurer {
362362
* @return the configurer.
363363
* @see DefaultErrorHandler
364364
*/
365-
public CustomizersConfigurer customizeErrorHandler(Consumer<CommonErrorHandler> errorHandlerCustomizer) {
365+
public CustomizersConfigurer customizeErrorHandler(Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
366366
this.errorHandlerCustomizer = errorHandlerCustomizer;
367367
return this;
368368
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicSchedulerWrapper.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
* framework requires a scheduler bean and looks for one in this order: 1. A single
2929
* instance of this class, 2. a single {@link TaskScheduler} bean, 3. when multiple
3030
* {@link TaskScheduler}s are present, a bean with the name {@code taskScheduler}.
31+
* If you use this class, you should provide a {@link TaskScheduler} that is not defined
32+
* as a bean; this class will maintain the scheduler's lifecycle.
3133
*
3234
* @author Gary Russell
3335
* @since 2.9

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/ListenerContainerFactoryConfigurerTests.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class ListenerContainerFactoryConfigurerTests {
8787
private ContainerProperties containerProperties;
8888

8989
@Captor
90-
private ArgumentCaptor<CommonErrorHandler> errorHandlerCaptor;
90+
private ArgumentCaptor<DefaultErrorHandler> errorHandlerCaptor;
9191

9292
private final ConsumerRecord<?, ?> record =
9393
new ConsumerRecord<>("test-topic", 1, 1234L, new Object(), new Object());
@@ -104,7 +104,7 @@ class ListenerContainerFactoryConfigurerTests {
104104
private OffsetCommitCallback offsetCommitCallback;
105105

106106
@Mock
107-
private java.util.function.Consumer<CommonErrorHandler> errorHandlerCustomizer;
107+
private java.util.function.Consumer<DefaultErrorHandler> errorHandlerCustomizer;
108108

109109
@SuppressWarnings("rawtypes")
110110
@Captor
@@ -173,12 +173,10 @@ void shouldSetupErrorHandling() {
173173

174174
// then
175175
then(container).should(times(1)).setCommonErrorHandler(errorHandlerCaptor.capture());
176-
CommonErrorHandler errorHandler = errorHandlerCaptor.getValue();
177-
assertThat(DefaultErrorHandler.class.isAssignableFrom(errorHandler.getClass())).isTrue();
178-
DefaultErrorHandler seekToCurrent = (DefaultErrorHandler) errorHandler;
176+
DefaultErrorHandler errorHandler = errorHandlerCaptor.getValue();
179177

180178
RuntimeException ex = new RuntimeException();
181-
seekToCurrent.handleRemaining(ex, records, consumer, container);
179+
errorHandler.handleRemaining(ex, records, consumer, container);
182180

183181
then(recoverer).should(times(1)).accept(record, consumer, ex);
184182
then(consumer).should(times(1)).commitAsync(any(Map.class), eq(offsetCommitCallback));

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationSupportTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@
3535
import org.mockito.ArgumentCaptor;
3636

3737
import org.springframework.beans.factory.BeanFactory;
38-
import org.springframework.kafka.listener.CommonErrorHandler;
3938
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
4039
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
4140
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
41+
import org.springframework.kafka.listener.DefaultErrorHandler;
4242
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
4343
import org.springframework.kafka.listener.ListenerContainerRegistry;
4444
import org.springframework.kafka.support.converter.ConversionException;
@@ -87,7 +87,7 @@ void testCreateConfigurer() {
8787
Consumer<DeadLetterPublishingRecovererFactory> dlprfCustomizer = mock(Consumer.class);
8888
Consumer<RetryTopicConfigurer> rtconfigurer = mock(Consumer.class);
8989
Consumer<ListenerContainerFactoryConfigurer> lcfcConsumer = mock(Consumer.class);
90-
Consumer<CommonErrorHandler> errorHandlerCustomizer = mock(Consumer.class);
90+
Consumer<DefaultErrorHandler> errorHandlerCustomizer = mock(Consumer.class);
9191
BackOff backoff = mock(BackOff.class);
9292

9393
RetryTopicConfigurationSupport support = new RetryTopicConfigurationSupport() {

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2122
import static org.mockito.ArgumentMatchers.anyString;
2223
import static org.mockito.BDDMockito.given;
2324
import static org.mockito.BDDMockito.willAnswer;
@@ -33,6 +34,7 @@
3334

3435
import org.springframework.beans.factory.BeanInitializationException;
3536
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
37+
import org.springframework.beans.factory.ObjectProvider;
3638
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
3739
import org.springframework.core.annotation.AnnotationUtils;
3840
import org.springframework.kafka.annotation.DltHandler;
@@ -166,13 +168,16 @@ void shouldThrowIfNoKafkaTemplateFound() {
166168
given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class))
167169
.willThrow(NoSuchBeanDefinitionException.class);
168170

169-
given(this.beanFactory.getBean("kafkaTemplate", KafkaOperations.class))
170-
.willThrow(NoSuchBeanDefinitionException.class);
171+
@SuppressWarnings({ "unchecked", "rawtypes" })
172+
ObjectProvider<KafkaOperations> templateProvider = mock(ObjectProvider.class);
173+
given(templateProvider.getIfUnique()).willReturn(null);
174+
given(this.beanFactory.getBeanProvider(KafkaOperations.class))
175+
.willReturn(templateProvider);
171176

172177
RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory);
173178

174179
// given - then
175-
assertThatExceptionOfType(BeanInitializationException.class).isThrownBy(() ->
180+
assertThatIllegalStateException().isThrownBy(() ->
176181
processor.processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt));
177182
}
178183

@@ -182,8 +187,11 @@ void shouldTrySpringBootDefaultKafkaTemplate() {
182187
// setup
183188
given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class))
184189
.willThrow(NoSuchBeanDefinitionException.class);
185-
given(this.beanFactory.getBean("kafkaTemplate", KafkaOperations.class))
186-
.willReturn(kafkaOperationsFromDefaultName);
190+
@SuppressWarnings({ "unchecked", "rawtypes" })
191+
ObjectProvider<KafkaOperations> templateProvider = mock(ObjectProvider.class);
192+
given(templateProvider.getIfUnique()).willReturn(kafkaOperationsFromDefaultName);
193+
given(this.beanFactory.getBeanProvider(KafkaOperations.class))
194+
.willReturn(templateProvider);
187195
RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory);
188196

189197
// given - then

0 commit comments

Comments
 (0)