Skip to content

Commit f68ef3d

Browse files
authored
GH-1653: Fix resetStateOnExceptionChange
Resolves #1653 Need to unwrap the cause of `ListenerExecutionFailedException` to determine if the exception type changed. * Add gradle plugin repo
1 parent fa50aa0 commit f68ef3d

File tree

3 files changed

+12
-5
lines changed

3 files changed

+12
-5
lines changed

build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ buildscript {
22
ext.kotlinVersion = '1.4.20'
33
repositories {
44
mavenCentral()
5+
maven { url 'https://plugins.gradle.org/m2' }
56
maven { url 'https://repo.spring.io/plugins-release' }
67
}
78
dependencies {

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,18 +149,24 @@ boolean skip(ConsumerRecord<?, ?> record, Exception exception) {
149149
private FailedRecord getFailedRecordInstance(ConsumerRecord<?, ?> record, Exception exception,
150150
Map<TopicPartition, FailedRecord> map, TopicPartition topicPartition) {
151151

152+
Exception realException = exception;
153+
if (realException instanceof ListenerExecutionFailedException
154+
&& realException.getCause() instanceof Exception) {
155+
156+
realException = (Exception) realException.getCause();
157+
}
152158
FailedRecord failedRecord = map.get(topicPartition);
153159
if (failedRecord == null || failedRecord.getOffset() != record.offset()
154160
|| (this.resetStateOnExceptionChange
155-
&& !exception.getClass().isInstance(failedRecord.getLastException()))) {
161+
&& !realException.getClass().isInstance(failedRecord.getLastException()))) {
156162

157-
failedRecord = new FailedRecord(record.offset(), determineBackOff(record, exception).start());
163+
failedRecord = new FailedRecord(record.offset(), determineBackOff(record, realException).start());
158164
map.put(topicPartition, failedRecord);
159165
}
160166
else {
161167
failedRecord.getDeliveryAttempts().incrementAndGet();
162168
}
163-
failedRecord.setLastException(exception);
169+
failedRecord.setLastException(realException);
164170
return failedRecord;
165171
}
166172

spring-kafka/src/test/java/org/springframework/kafka/listener/FailedRecordTrackerTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,9 @@ void exceptionChanges(boolean reset) {
171171
.isSameAs(be1);
172172
TopicPartitionOffset tpo = new TopicPartitionOffset("foo", 0, 0L);
173173
assertThat(tracker.deliveryAttempt(tpo)).isEqualTo(2);
174-
tracker.skip(record1, new IllegalStateException());
174+
tracker.skip(record1, new ListenerExecutionFailedException("test", new IllegalStateException()));
175175
assertThat(tracker.deliveryAttempt(tpo)).isEqualTo(3);
176-
tracker.skip(record1, new IllegalArgumentException());
176+
tracker.skip(record1, new ListenerExecutionFailedException("test", new IllegalArgumentException()));
177177
if (reset) {
178178
assertThat(tracker.deliveryAttempt(tpo)).isEqualTo(2);
179179
assertThat(KafkaTestUtils.getPropertyValue(failures.get()

0 commit comments

Comments
 (0)