Skip to content

Commit 36a5297

Browse files
garyrussellartembilan
authored andcommitted
Deprecate Accessors for Legacy Error Handlers
**cherry-pick to 2.8.x**
1 parent 5728ed7 commit 36a5297

12 files changed

+53
-22
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2021 the original author or authors.
2+
* Copyright 2014-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -253,7 +253,10 @@ public void setReplyTemplate(KafkaTemplate<?, ?> replyTemplate) {
253253
* Set the error handler to call when the listener throws an exception.
254254
* @param errorHandler the error handler.
255255
* @since 2.2
256+
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
257+
* @see #setCommonErrorHandler(CommonErrorHandler)
256258
*/
259+
@Deprecated
257260
public void setErrorHandler(ErrorHandler errorHandler) {
258261
this.errorHandler = errorHandler;
259262
}
@@ -262,7 +265,10 @@ public void setErrorHandler(ErrorHandler errorHandler) {
262265
* Set the batch error handler to call when the listener throws an exception.
263266
* @param errorHandler the error handler.
264267
* @since 2.2
268+
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
269+
* @see #setCommonErrorHandler(CommonErrorHandler)
265270
*/
271+
@Deprecated
266272
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
267273
this.errorHandler = errorHandler;
268274
}
@@ -419,6 +425,7 @@ private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint)
419425
* @param instance the container instance to configure.
420426
* @param endpoint the endpoint.
421427
*/
428+
@SuppressWarnings("deprecation")
422429
protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
423430
ContainerProperties properties = instance.getContainerProperties();
424431
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,10 @@ public ApplicationEventPublisher getApplicationEventPublisher() {
207207
* Set the error handler to call when the listener throws an exception.
208208
* @param errorHandler the error handler.
209209
* @since 2.2
210+
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
211+
* @see #setCommonErrorHandler(CommonErrorHandler)
210212
*/
213+
@Deprecated
211214
public void setErrorHandler(ErrorHandler errorHandler) {
212215
this.errorHandler = errorHandler;
213216
}
@@ -216,16 +219,22 @@ public void setErrorHandler(ErrorHandler errorHandler) {
216219
* Set the error handler to call when the listener throws an exception.
217220
* @param errorHandler the error handler.
218221
* @since 2.2
222+
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
223+
* @see #setCommonErrorHandler(CommonErrorHandler)
219224
*/
220-
public void setGenericErrorHandler(GenericErrorHandler<?> errorHandler) {
225+
@Deprecated
226+
public void setGenericErrorHandler(@Nullable GenericErrorHandler<?> errorHandler) {
221227
this.errorHandler = errorHandler;
222228
}
223229

224230
/**
225231
* Set the batch error handler to call when the listener throws an exception.
226232
* @param errorHandler the error handler.
227233
* @since 2.2
234+
* @deprecated in favor of {@link #setCommonErrorHandler(CommonErrorHandler)}
235+
* @see #setCommonErrorHandler(CommonErrorHandler)
228236
*/
237+
@Deprecated
229238
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
230239
this.errorHandler = errorHandler;
231240
}
@@ -234,7 +243,10 @@ public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
234243
* Get the configured error handler.
235244
* @return the error handler.
236245
* @since 2.2
246+
* @deprecated in favor of {@link #getCommonErrorHandler()}
247+
* @see #getCommonErrorHandler()
237248
*/
249+
@Deprecated
238250
@Nullable
239251
public GenericErrorHandler<?> getGenericErrorHandler() {
240252
return this.errorHandler;

spring-kafka/src/main/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2021 the original author or authors.
2+
* Copyright 2015-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -212,6 +212,7 @@ protected void doStart() {
212212
}
213213
}
214214

215+
@SuppressWarnings("deprecation")
215216
private void configureChildContainer(int index, KafkaMessageListenerContainer<K, V> container) {
216217
String beanName = getBeanName();
217218
beanName = (beanName == null ? "consumer" : beanName) + "-" + index;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -768,7 +768,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
768768
this.commitCurrentOnAssignment = determineCommitCurrent(consumerProperties,
769769
KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties());
770770
subscribeOrAssignTopics(this.consumer);
771-
GenericErrorHandler<?> errHandler = getGenericErrorHandler();
772771
if (listener instanceof BatchMessageListener) {
773772
this.listener = null;
774773
this.batchListener = (BatchMessageListener<K, V>) listener;
@@ -792,7 +791,7 @@ else if (listener instanceof MessageListener) {
792791
this.listenerType = listenerType;
793792
this.isConsumerAwareListener = listenerType.equals(ListenerType.ACKNOWLEDGING_CONSUMER_AWARE)
794793
|| listenerType.equals(ListenerType.CONSUMER_AWARE);
795-
this.commonErrorHandler = determineCommonErrorHandler(errHandler);
794+
this.commonErrorHandler = determineCommonErrorHandler();
796795
Assert.state(!this.isBatchListener || !this.isRecordAck,
797796
"Cannot use AckMode.RECORD with a batch listener");
798797
if (this.containerProperties.getScheduler() != null) {
@@ -848,8 +847,10 @@ else if (this.commonRecordInterceptor != null) {
848847
}
849848

850849
@Nullable
851-
private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHandler<?> errHandler) {
850+
private CommonErrorHandler determineCommonErrorHandler() {
852851
CommonErrorHandler common = getCommonErrorHandler();
852+
@SuppressWarnings("deprecation")
853+
GenericErrorHandler<?> errHandler = getGenericErrorHandler();
853854
if (common != null) {
854855
if (errHandler != null) {
855856
this.logger.debug("GenericErrorHandler is ignored when a CommonErrorHandler is provided");
@@ -860,7 +861,7 @@ private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHan
860861
return new DefaultErrorHandler();
861862
}
862863
if (this.isBatchListener) {
863-
validateErrorHandler(true);
864+
validateErrorHandler(true, errHandler);
864865
BatchErrorHandler batchErrorHandler = (BatchErrorHandler) errHandler;
865866
if (batchErrorHandler != null) {
866867
return new ErrorHandlerAdapter(batchErrorHandler);
@@ -870,7 +871,7 @@ private CommonErrorHandler determineCommonErrorHandler(@Nullable GenericErrorHan
870871
}
871872
}
872873
else {
873-
validateErrorHandler(false);
874+
validateErrorHandler(false, errHandler);
874875
ErrorHandler eh = (ErrorHandler) errHandler;
875876
if (eh != null) {
876877
return new ErrorHandlerAdapter(eh);
@@ -1201,8 +1202,7 @@ private void seekPartitions(Collection<TopicPartition> partitions, boolean idle)
12011202
}
12021203
}
12031204

1204-
private void validateErrorHandler(boolean batch) {
1205-
GenericErrorHandler<?> errHandler = KafkaMessageListenerContainer.this.getGenericErrorHandler();
1205+
private void validateErrorHandler(boolean batch, @Nullable GenericErrorHandler<?> errHandler) {
12061206
if (errHandler == null) {
12071207
return;
12081208
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1034,6 +1034,7 @@ public void handleRemaining(Exception thrownException, List<ConsumerRecord<?, ?>
10341034
}
10351035

10361036
@Bean
1037+
@SuppressWarnings("deprecation")
10371038
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
10381039
factoryWithBadConverter() {
10391040

@@ -1269,6 +1270,7 @@ public KafkaListenerContainerFactory<?> batchManualFactory2() {
12691270
}
12701271

12711272
@Bean
1273+
@SuppressWarnings("deprecation")
12721274
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
12731275
recordAckListenerContainerFactory() {
12741276

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerMockTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -124,7 +124,7 @@ else if (event instanceof ConsumerFailedToStartEvent) {
124124
exec.destroy();
125125
}
126126

127-
@SuppressWarnings({ "rawtypes", "unchecked" })
127+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
128128
@Test
129129
void testCorrectContainerForConsumerError() throws InterruptedException {
130130
ConsumerFactory consumerFactory = mock(ConsumerFactory.class);

spring-kafka/src/test/java/org/springframework/kafka/listener/ConcurrentMessageListenerContainerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -595,6 +595,7 @@ public ConsumerRecords<Integer, String> answer(InvocationOnMock invocation) thro
595595
container.stop();
596596
}
597597

598+
@SuppressWarnings("deprecation")
598599
@Test
599600
public void testListenerException() throws Exception {
600601
this.logger.info("Start exception");

spring-kafka/src/test/java/org/springframework/kafka/listener/ErrorHandlingDeserializerTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -151,6 +151,7 @@ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerCont
151151
return factory(cfWithExplicitDeserializers());
152152
}
153153

154+
@SuppressWarnings("deprecation")
154155
private ConcurrentKafkaListenerContainerFactory<String, String> factory(ConsumerFactory<String, String> cf) {
155156
ConcurrentKafkaListenerContainerFactory<String, String> factory =
156157
new ConcurrentKafkaListenerContainerFactory<>();

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,6 +1363,7 @@ else if (entry.getValue().offset() == 2) {
13631363
logger.info("Stop batch listener manual");
13641364
}
13651365

1366+
@SuppressWarnings("deprecation")
13661367
@Test
13671368
public void testBatchListenerErrors() throws Exception {
13681369
logger.info("Start batch listener errors");
@@ -1430,7 +1431,7 @@ public void testBatchListenerErrors() throws Exception {
14301431
logger.info("Stop batch listener errors");
14311432
}
14321433

1433-
@SuppressWarnings("unchecked")
1434+
@SuppressWarnings({ "unchecked", "deprecation" })
14341435
@Test
14351436
public void testBatchListenerAckAfterRecoveryMock() throws Exception {
14361437
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
@@ -2413,6 +2414,7 @@ public void testBadAckMode() {
24132414
}
24142415

24152416
@Test
2417+
@SuppressWarnings("deprecation")
24162418
public void testBadErrorHandler() {
24172419
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka);
24182420
DefaultKafkaConsumerFactory<Integer, Foo1> cf = new DefaultKafkaConsumerFactory<>(props);
@@ -2430,6 +2432,7 @@ public void testBadErrorHandler() {
24302432
}
24312433

24322434
@Test
2435+
@SuppressWarnings("deprecation")
24332436
public void testBadBatchErrorHandler() {
24342437
Map<String, Object> props = KafkaTestUtils.consumerProps("testStatic", "false", embeddedKafka);
24352438
DefaultKafkaConsumerFactory<Integer, Foo1> cf = new DefaultKafkaConsumerFactory<>(props);
@@ -2977,7 +2980,7 @@ public void testAckModeCount() throws Exception {
29772980
container.stop();
29782981
}
29792982

2980-
@SuppressWarnings({ "unchecked", "rawtypes" })
2983+
@SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
29812984
@Test
29822985
public void testCommitErrorHandlerCalled() throws Exception {
29832986
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);

spring-kafka/src/test/java/org/springframework/kafka/listener/RemainingRecordsErrorHandlerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -194,7 +194,7 @@ public Consumer consumer() {
194194
return consumer;
195195
}
196196

197-
@SuppressWarnings({ "rawtypes", "unchecked" })
197+
@SuppressWarnings({ "rawtypes", "unchecked", "deprecation" })
198198
@Bean
199199
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
200200
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();

spring-kafka/src/test/java/org/springframework/kafka/listener/RetryingBatchErrorHandlerIntegrationTests.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020-2021 the original author or authors.
2+
* Copyright 2020-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -77,6 +77,7 @@ public static void setup() {
7777
embeddedKafka = EmbeddedKafkaCondition.getBroker();
7878
}
7979

80+
@SuppressWarnings("deprecation")
8081
@Test
8182
public void testRetriesAndDlt() throws InterruptedException {
8283
Map<String, Object> props = KafkaTestUtils.consumerProps("retryBatch", "false", embeddedKafka);
@@ -143,6 +144,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
143144
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
144145
}
145146

147+
@SuppressWarnings("deprecation")
146148
@Test
147149
public void testRetriesCantRecover() throws InterruptedException {
148150
Map<String, Object> props = KafkaTestUtils.consumerProps("retryBatch2", "false", embeddedKafka);
@@ -213,7 +215,7 @@ public void accept(ConsumerRecord<?, ?> record, Exception exception) {
213215
assertThat(stopLatch.await(10, TimeUnit.SECONDS)).isTrue();
214216
}
215217

216-
@SuppressWarnings("unchecked")
218+
@SuppressWarnings({ "unchecked", "deprecation" })
217219
@Test
218220
void consumerEx() throws InterruptedException {
219221
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -151,6 +151,7 @@ class EnableKafkaKotlinTests {
151151
}
152152

153153
@Bean
154+
@Suppress("deprecation")
154155
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
155156
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
156157
= ConcurrentKafkaListenerContainerFactory()
@@ -167,6 +168,7 @@ class EnableKafkaKotlinTests {
167168
}
168169

169170
@Bean
171+
@Suppress("deprecation")
170172
fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
171173
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
172174
= ConcurrentKafkaListenerContainerFactory()

0 commit comments

Comments
 (0)