Skip to content

Commit c3583a4

Browse files
garyrussellsnicoll
authored andcommitted
Auto-Configure Kafka CommonErrorHandler
`ErrorHandler/BatchErrorHandler` will be deprecated in a future release in favor of `CommonErrorHandler`. Currently, the legacy handlers are adapted to a `CommonErrorHandler` or ignored if a `CommonErrorHandler` is configured. See gh-27927
1 parent 657b8ce commit c3583a4

File tree

3 files changed

+31
-1
lines changed

3 files changed

+31
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.core.KafkaTemplate;
2626
import org.springframework.kafka.listener.AfterRollbackProcessor;
2727
import org.springframework.kafka.listener.BatchErrorHandler;
28+
import org.springframework.kafka.listener.CommonErrorHandler;
2829
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
2930
import org.springframework.kafka.listener.ContainerProperties;
3031
import org.springframework.kafka.listener.ErrorHandler;
@@ -58,6 +59,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
5859

5960
private BatchErrorHandler batchErrorHandler;
6061

62+
private CommonErrorHandler commonErrorHandler;
63+
6164
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
6265

6366
private RecordInterceptor<Object, Object> recordInterceptor;
@@ -127,6 +130,15 @@ void setBatchErrorHandler(BatchErrorHandler batchErrorHandler) {
127130
this.batchErrorHandler = batchErrorHandler;
128131
}
129132

133+
/**
134+
* Set the {@link CommonErrorHandler} to use.
135+
* @param commonErrorHandler the error handler.
136+
* @since 2.6
137+
*/
138+
public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) {
139+
this.commonErrorHandler = commonErrorHandler;
140+
}
141+
130142
/**
131143
* Set the {@link AfterRollbackProcessor} to use.
132144
* @param afterRollbackProcessor the after rollback processor
@@ -171,6 +183,7 @@ private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory<Ob
171183
else {
172184
factory.setErrorHandler(this.errorHandler);
173185
}
186+
map.from(this.commonErrorHandler).to(factory::setCommonErrorHandler);
174187
map.from(this.afterRollbackProcessor).to(factory::setAfterRollbackProcessor);
175188
map.from(this.recordInterceptor).to(factory::setRecordInterceptor);
176189
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.kafka.core.KafkaTemplate;
3131
import org.springframework.kafka.listener.AfterRollbackProcessor;
3232
import org.springframework.kafka.listener.BatchErrorHandler;
33+
import org.springframework.kafka.listener.CommonErrorHandler;
3334
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
3435
import org.springframework.kafka.listener.ErrorHandler;
3536
import org.springframework.kafka.listener.RecordInterceptor;
@@ -68,6 +69,8 @@ class KafkaAnnotationDrivenConfiguration {
6869

6970
private final BatchErrorHandler batchErrorHandler;
7071

72+
private final CommonErrorHandler commonErrorHandler;
73+
7174
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
7275

7376
private final RecordInterceptor<Object, Object> recordInterceptor;
@@ -79,7 +82,7 @@ class KafkaAnnotationDrivenConfiguration {
7982
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
8083
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
8184
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener, ObjectProvider<ErrorHandler> errorHandler,
82-
ObjectProvider<BatchErrorHandler> batchErrorHandler,
85+
ObjectProvider<BatchErrorHandler> batchErrorHandler, ObjectProvider<CommonErrorHandler> commonErrorHandler,
8386
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
8487
ObjectProvider<RecordInterceptor<Object, Object>> recordInterceptor) {
8588
this.properties = properties;
@@ -92,6 +95,7 @@ class KafkaAnnotationDrivenConfiguration {
9295
this.rebalanceListener = rebalanceListener.getIfUnique();
9396
this.errorHandler = errorHandler.getIfUnique();
9497
this.batchErrorHandler = batchErrorHandler.getIfUnique();
98+
this.commonErrorHandler = commonErrorHandler.getIfUnique();
9599
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
96100
this.recordInterceptor = recordInterceptor.getIfUnique();
97101
}
@@ -110,6 +114,7 @@ ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryC
110114
configurer.setRebalanceListener(this.rebalanceListener);
111115
configurer.setErrorHandler(this.errorHandler);
112116
configurer.setBatchErrorHandler(this.batchErrorHandler);
117+
configurer.setCommonErrorHandler(this.commonErrorHandler);
113118
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
114119
configurer.setRecordInterceptor(this.recordInterceptor);
115120
return configurer;

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.springframework.kafka.core.KafkaTemplate;
5959
import org.springframework.kafka.listener.AfterRollbackProcessor;
6060
import org.springframework.kafka.listener.BatchErrorHandler;
61+
import org.springframework.kafka.listener.CommonErrorHandler;
6162
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
6263
import org.springframework.kafka.listener.ContainerProperties;
6364
import org.springframework.kafka.listener.ContainerProperties.AckMode;
@@ -546,6 +547,17 @@ void concurrentKafkaListenerContainerFactoryInBatchModeAndSimpleErrorHandlerShou
546547
});
547548
}
548549

550+
@Test
551+
void testConcurrentKafkaListenerContainerFactoryWithCustomCommonErrorHandler() {
552+
this.contextRunner.withBean("errorHandler", CommonErrorHandler.class, () -> mock(CommonErrorHandler.class))
553+
.run((context) -> {
554+
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
555+
.getBean(ConcurrentKafkaListenerContainerFactory.class);
556+
assertThat(factory).hasFieldOrPropertyWithValue("commonErrorHandler",
557+
context.getBean("errorHandler"));
558+
});
559+
}
560+
549561
@Test
550562
void testConcurrentKafkaListenerContainerFactoryWithDefaultTransactionManager() {
551563
this.contextRunner.withPropertyValues("spring.kafka.producer.transaction-id-prefix=test").run((context) -> {

0 commit comments

Comments
 (0)