Skip to content

Commit ab64d4d

Browse files
garyrussellartembilan
authored andcommitted
GH-920: Suppress ERROR Log for Expected Exception
``` 2021-02-25 15:44:31.640 ERROR 73473 --- [etry-8000-0-C-1] o.s.k.l.SeekToCurrentErrorHandler : Failed to determine if this record (kgh920-retry-8000-0@5) should be recovererd, including in seeks ``` In `SeekUtils` when the nested exception is a `KafkaBackoffException`. Also fix some asciidoctor rendering in the docs.
1 parent e5e64e0 commit ab64d4d

File tree

2 files changed

+23
-9
lines changed

2 files changed

+23
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.kafka.common.TopicPartition;
3131
import org.apache.kafka.common.errors.SerializationException;
3232

33+
import org.springframework.core.NestedRuntimeException;
3334
import org.springframework.core.log.LogAccessor;
3435
import org.springframework.kafka.KafkaException;
3536
import org.springframework.kafka.KafkaException.Level;
@@ -105,8 +106,15 @@ public static boolean doSeeks(List<ConsumerRecord<?, ?>> records, Consumer<?, ?>
105106
skipped.set(test);
106107
}
107108
catch (Exception ex) {
108-
logger.error(ex, "Failed to determine if this record (" + ListenerUtils.recordToString(record)
109-
+ ") should be recovererd, including in seeks");
109+
if (isBackoffException(ex)) {
110+
logger.debug(ex, () -> ListenerUtils.recordToString(record)
111+
+ " included in seeks due to retry back off");
112+
}
113+
else {
114+
logger.error(ex, () -> "Failed to determine if this record ("
115+
+ ListenerUtils.recordToString(record)
116+
+ ") should be recovererd, including in seeks");
117+
}
110118
skipped.set(false);
111119
}
112120
if (skipped.get()) {
@@ -221,4 +229,15 @@ public static void seekOrRecover(Exception thrownException, List<ConsumerRecord<
221229
}
222230
}
223231

232+
/**
233+
* Return true if the exception is a {@link KafkaBackoffException}.
234+
* @param exception the exception.
235+
* @return true if it's a back off.
236+
* @since 2.7
237+
*/
238+
public static boolean isBackoffException(Exception exception) {
239+
return NestedRuntimeException.class.isAssignableFrom(exception.getClass())
240+
&& ((NestedRuntimeException) exception).contains(KafkaBackoffException.class);
241+
}
242+
224243
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.springframework.core.NestedRuntimeException;
3030
import org.springframework.kafka.core.KafkaOperations;
3131
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
32-
import org.springframework.kafka.listener.KafkaBackoffException;
32+
import org.springframework.kafka.listener.SeekUtils;
3333
import org.springframework.kafka.support.KafkaHeaders;
3434

3535
/**
@@ -96,7 +96,7 @@ public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublis
9696
}
9797

9898
private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e) {
99-
if (isBackoffException(e)) {
99+
if (SeekUtils.isBackoffException(e)) {
100100
throw (NestedRuntimeException) e; // Necessary to not commit the offset and seek to current again
101101
}
102102

@@ -112,11 +112,6 @@ private TopicPartition resolveDestination(ConsumerRecord<?, ?> cr, Exception e)
112112
cr.partition() % nextDestination.getDestinationPartitions());
113113
}
114114

115-
private boolean isBackoffException(Exception e) {
116-
return NestedRuntimeException.class.isAssignableFrom(e.getClass())
117-
&& ((NestedRuntimeException) e).contains(KafkaBackoffException.class);
118-
}
119-
120115
private int getAttempts(ConsumerRecord<?, ?> consumerRecord) {
121116
Header header = consumerRecord.headers().lastHeader(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS);
122117
return header != null

0 commit comments

Comments
 (0)