Skip to content

Commit abe1209

Browse files
garyrussellartembilan
authored andcommitted
GH-1918: Don't Retry DLT Fatal Exceptions
Resolves #1918 Do not publish records with fatal exceptions to the same topic, it causes an irrecoverable endless loop. Log at ERROR when such a situation is detected. Extract a new abstract super class for exception classification.
1 parent 29a3bd2 commit abe1209

File tree

8 files changed

+317
-146
lines changed

8 files changed

+317
-146
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -532,11 +532,11 @@ public class MyCustomDltProcessor {
532532

533533
NOTE: If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used.
534534

535-
===== Dlt Failure Behavior
535+
===== DLT Failure Behavior
536536

537-
Should the Dlt processing fail, there are two possible behaviors available: `ALWAYS_RETRY_ON_ERROR` and `FAIL_ON_ERROR`.
537+
Should the DLT processing fail, there are two possible behaviors available: `ALWAYS_RETRY_ON_ERROR` and `FAIL_ON_ERROR`.
538538

539-
In the former the message is forwarded back to the dlt topic so it doesn't block other dlt messages' processing.
539+
In the former the record is forwarded back to the DLT topic so it doesn't block other DLT records' processing.
540540
In the latter the consumer ends the execution without forwarding the message.
541541

542542
====
@@ -566,7 +566,21 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
566566

567567
NOTE: The default behavior is to `ALWAYS_RETRY_ON_ERROR`.
568568

569-
===== Configuring No Dlt
569+
IMPORTANT: Starting with version 2.8, `ALWAYS_RETRY_ON_ERROR` will NOT route a record back to the DLT if the record causes a fatal exception to be thrown,
570+
such as a `DeserializationException` because, generally, such exceptions will always be thrown.
571+
572+
Exceptions that are considered fatal are
573+
574+
* `DeserializationException`
575+
* `MessageConversionException`
576+
* `ConversionException`
577+
* `MethodArgumentResolutionException`
578+
* `NoSuchMethodException`
579+
* `ClassCastException`
580+
581+
You can add exceptions to and remove exceptions from this list using methods on the `DeadLetterPublishingRecovererFactory` bean.
582+
583+
===== Configuring No DLT
570584

571585
The framework also provides the possibility of not configuring a DLT for the topic.
572586
In this case after retrials are exhausted the processing simply ends.

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
* @since 2.2
6565
*
6666
*/
67-
public class DeadLetterPublishingRecoverer implements ConsumerAwareRecordRecoverer {
67+
public class DeadLetterPublishingRecoverer extends ExceptionClassifier implements ConsumerAwareRecordRecoverer {
6868

6969
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR
7070

@@ -305,6 +305,14 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
305305
TopicPartition tp = this.destinationResolver.apply(record, exception);
306306
if (tp == null) {
307307
maybeThrow(record, exception);
308+
this.logger.debug(() -> "Recovery of " + ListenerUtils.recordToString(record, true)
309+
+ " skipped because destination resolver returned null");
310+
return;
311+
}
312+
if (tp.topic().equals(record.topic()) && !getClassifier().classify(exception)) {
313+
this.logger.error("Recovery of " + ListenerUtils.recordToString(record, true)
314+
+ " skipped because not retryable exception " + exception.toString()
315+
+ " and the destination resolver routed back to the same topic");
308316
return;
309317
}
310318
if (consumer != null && this.verifyPartition) {
@@ -320,7 +328,7 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
320328
kDeserEx == null ? null : kDeserEx.getData(), vDeserEx == null ? null : vDeserEx.getData());
321329
KafkaOperations<Object, Object> kafkaTemplate =
322330
(KafkaOperations<Object, Object>) this.templateResolver.apply(outRecord);
323-
sendOrThrow(outRecord, kafkaTemplate);
331+
sendOrThrow(outRecord, kafkaTemplate, record);
324332
}
325333

326334
private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception exception,
@@ -345,10 +353,10 @@ private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception excepti
345353
}
346354

347355
private void sendOrThrow(ProducerRecord<Object, Object> outRecord,
348-
@Nullable KafkaOperations<Object, Object> kafkaTemplate) {
356+
@Nullable KafkaOperations<Object, Object> kafkaTemplate, ConsumerRecord<?, ?> inRecord) {
349357

350358
if (kafkaTemplate != null) {
351-
send(outRecord, kafkaTemplate);
359+
send(outRecord, kafkaTemplate, inRecord);
352360
}
353361
else {
354362
throw new IllegalArgumentException("No kafka template returned for record " + outRecord);
@@ -369,17 +377,20 @@ private void maybeThrow(ConsumerRecord<?, ?> record, Exception exception) {
369377
* Send the record.
370378
* @param outRecord the record.
371379
* @param kafkaTemplate the template.
380+
* @param inRecord the consumer record.
372381
* @since 2.7
373382
*/
374-
protected void send(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
383+
protected void send(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
384+
ConsumerRecord<?, ?> inRecord) {
385+
375386
if (this.transactional && !kafkaTemplate.inTransaction() && !kafkaTemplate.isAllowNonTransactional()) {
376387
kafkaTemplate.executeInTransaction(t -> {
377-
publish(outRecord, t);
388+
publish(outRecord, t, inRecord);
378389
return null;
379390
});
380391
}
381392
else {
382-
publish(outRecord, kafkaTemplate);
393+
publish(outRecord, kafkaTemplate, inRecord);
383394
}
384395
}
385396

@@ -409,7 +420,8 @@ private TopicPartition checkPartition(TopicPartition tp, Consumer<?, ?> consumer
409420

410421
@SuppressWarnings("unchecked")
411422
private KafkaOperations<Object, Object> findTemplateForValue(@Nullable Object value,
412-
Map<Class<?>, KafkaOperations<?, ?>> templates) {
423+
Map<Class<?>, KafkaOperations<?, ?>> templates) {
424+
413425
if (value == null) {
414426
KafkaOperations<?, ?> operations = templates.get(Void.class);
415427
if (operations == null) {
@@ -461,46 +473,55 @@ protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?,
461473
* Override this if you want more than just logging of the send result.
462474
* @param outRecord the record to send.
463475
* @param kafkaTemplate the template.
476+
* @param inRecord the consumer record.
464477
* @since 2.2.5
465478
*/
466-
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
479+
protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate,
480+
ConsumerRecord<?, ?> inRecord) {
481+
467482
ListenableFuture<SendResult<Object, Object>> sendResult = null;
468483
try {
469484
sendResult = kafkaTemplate.send(outRecord);
470485
sendResult.addCallback(result -> {
471-
this.logger.debug(() -> "Successful dead-letter publication: " + result);
486+
this.logger.debug(() -> "Successful dead-letter publication: "
487+
+ ListenerUtils.recordToString(inRecord, true) + " to " + result.getRecordMetadata());
472488
}, ex -> {
473-
this.logger.error(ex, () -> "Dead-letter publication failed for: " + outRecord);
489+
this.logger.error(ex, () -> pubFailMessage(outRecord, inRecord));
474490
});
475491
}
476492
catch (Exception e) {
477-
this.logger.error(e, () -> "Dead-letter publication failed for: " + outRecord);
493+
this.logger.error(e, () -> pubFailMessage(outRecord, inRecord));
478494
}
479495
if (this.failIfSendResultIsError) {
480-
verifySendResult(kafkaTemplate, outRecord, sendResult);
496+
verifySendResult(kafkaTemplate, outRecord, sendResult, inRecord);
481497
}
482498
}
483499

484500
private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
485501
ProducerRecord<Object, Object> outRecord,
486-
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult) {
502+
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult, ConsumerRecord<?, ?> inRecord) {
487503

488504
Duration sendTimeout = determineSendTimeout(kafkaTemplate);
489505
if (sendResult == null) {
490-
throw new KafkaException("Dead-letter publication failed for: " + outRecord);
506+
throw new KafkaException(pubFailMessage(outRecord, inRecord));
491507
}
492508
try {
493509
sendResult.get(sendTimeout.toMillis(), TimeUnit.MILLISECONDS);
494510
}
495511
catch (InterruptedException e) {
496512
Thread.currentThread().interrupt();
497-
throw new KafkaException("Publication failed for: " + outRecord, e);
513+
throw new KafkaException(pubFailMessage(outRecord, inRecord));
498514
}
499515
catch (ExecutionException | TimeoutException e) {
500-
throw new KafkaException("Publication failed for: " + outRecord, e);
516+
throw new KafkaException(pubFailMessage(outRecord, inRecord), e);
501517
}
502518
}
503519

520+
private String pubFailMessage(ProducerRecord<Object, Object> outRecord, ConsumerRecord<?, ?> inRecord) {
521+
return "Dead-letter publication to "
522+
+ outRecord.topic() + "failed for: " + ListenerUtils.recordToString(inRecord, true);
523+
}
524+
504525
private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
505526
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
506527
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.springframework.classify.BinaryExceptionClassifier;
23+
import org.springframework.kafka.support.converter.ConversionException;
24+
import org.springframework.kafka.support.serializer.DeserializationException;
25+
import org.springframework.messaging.converter.MessageConversionException;
26+
import org.springframework.messaging.handler.invocation.MethodArgumentResolutionException;
27+
import org.springframework.util.Assert;
28+
29+
/**
30+
* Supports exception classification.
31+
*
32+
* @author Gary Russell
33+
* @since 2.8
34+
*
35+
*/
36+
public abstract class ExceptionClassifier extends KafkaExceptionLogLevelAware {
37+
38+
private ExtendedBinaryExceptionClassifier classifier;
39+
40+
/**
41+
* Construct the instance.
42+
*/
43+
public ExceptionClassifier() {
44+
this.classifier = configureDefaultClassifier();
45+
}
46+
47+
private static ExtendedBinaryExceptionClassifier configureDefaultClassifier() {
48+
Map<Class<? extends Throwable>, Boolean> classified = new HashMap<>();
49+
classified.put(DeserializationException.class, false);
50+
classified.put(MessageConversionException.class, false);
51+
classified.put(ConversionException.class, false);
52+
classified.put(MethodArgumentResolutionException.class, false);
53+
classified.put(NoSuchMethodException.class, false);
54+
classified.put(ClassCastException.class, false);
55+
return new ExtendedBinaryExceptionClassifier(classified, true);
56+
}
57+
58+
/**
59+
* Return the exception classifier.
60+
* @return the classifier.
61+
*/
62+
protected BinaryExceptionClassifier getClassifier() {
63+
return this.classifier;
64+
}
65+
66+
/**
67+
* Set an exception classifications to determine whether the exception should cause a retry
68+
* (until exhaustion) or not. If not, we go straight to the recoverer. By default,
69+
* the following exceptions will not be retried:
70+
* <ul>
71+
* <li>{@link DeserializationException}</li>
72+
* <li>{@link MessageConversionException}</li>
73+
* <li>{@link MethodArgumentResolutionException}</li>
74+
* <li>{@link NoSuchMethodException}</li>
75+
* <li>{@link ClassCastException}</li>
76+
* </ul>
77+
* All others will be retried.
78+
* When calling this method, the defaults will not be applied.
79+
* @param classifications the classifications.
80+
* @param defaultValue whether or not to retry non-matching exceptions.
81+
* @see BinaryExceptionClassifier#BinaryExceptionClassifier(Map, boolean)
82+
* @see #addNotRetryableExceptions(Class...)
83+
*/
84+
public void setClassifications(Map<Class<? extends Throwable>, Boolean> classifications, boolean defaultValue) {
85+
Assert.notNull(classifications, "'classifications' + cannot be null");
86+
this.classifier = new ExtendedBinaryExceptionClassifier(classifications, defaultValue);
87+
}
88+
89+
/**
90+
* Add exception types to the default list. By default, the following exceptions will
91+
* not be retried:
92+
* <ul>
93+
* <li>{@link DeserializationException}</li>
94+
* <li>{@link MessageConversionException}</li>
95+
* <li>{@link ConversionException}</li>
96+
* <li>{@link MethodArgumentResolutionException}</li>
97+
* <li>{@link NoSuchMethodException}</li>
98+
* <li>{@link ClassCastException}</li>
99+
* </ul>
100+
* All others will be retried.
101+
* @param exceptionTypes the exception types.
102+
* @see #removeNotRetryableException(Class)
103+
* @see #setClassifications(Map, boolean)
104+
*/
105+
@SafeVarargs
106+
@SuppressWarnings("varargs")
107+
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
108+
Assert.notNull(exceptionTypes, "'exceptionTypes' cannot be null");
109+
Assert.noNullElements(exceptionTypes, "'exceptionTypes' cannot contain nulls");
110+
for (Class<? extends Exception> exceptionType : exceptionTypes) {
111+
Assert.isTrue(Exception.class.isAssignableFrom(exceptionType),
112+
() -> "exceptionType " + exceptionType + " must be an Exception");
113+
this.classifier.getClassified().put(exceptionType, false);
114+
}
115+
}
116+
117+
/**
118+
* Remove an exception type from the configured list. By default, the following
119+
* exceptions will not be retried:
120+
* <ul>
121+
* <li>{@link DeserializationException}</li>
122+
* <li>{@link MessageConversionException}</li>
123+
* <li>{@link ConversionException}</li>
124+
* <li>{@link MethodArgumentResolutionException}</li>
125+
* <li>{@link NoSuchMethodException}</li>
126+
* <li>{@link ClassCastException}</li>
127+
* </ul>
128+
* All others will be retried.
129+
* @param exceptionType the exception type.
130+
* @return true if the removal was successful.
131+
* @see #addNotRetryableExceptions(Class...)
132+
* @see #setClassifications(Map, boolean)
133+
*/
134+
public boolean removeNotRetryableException(Class<? extends Exception> exceptionType) {
135+
return this.classifier.getClassified().remove(exceptionType);
136+
}
137+
138+
/**
139+
* Extended to provide visibility to the current classified exceptions.
140+
*
141+
* @author Gary Russell
142+
*
143+
*/
144+
@SuppressWarnings("serial")
145+
private static final class ExtendedBinaryExceptionClassifier extends BinaryExceptionClassifier {
146+
147+
ExtendedBinaryExceptionClassifier(Map<Class<? extends Throwable>, Boolean> typeMap, boolean defaultValue) {
148+
super(typeMap, defaultValue);
149+
setTraverseCauses(true);
150+
}
151+
152+
@Override
153+
protected Map<Class<? extends Throwable>, Boolean> getClassified() { // NOSONAR worthless override
154+
return super.getClassified();
155+
}
156+
157+
}
158+
159+
}

0 commit comments

Comments
 (0)