Skip to content

Commit c188304

Browse files
Wzy19930507Zhiyang.Wang1
andauthored
GH-2968: Fix DEH#handleBatchAndReturnRemaining
DefaultErrorHandler#handleBatchAndReturnRemaining recovered invalid and infinite loop when kafka listener threw BatchListenerFailedException and error record is first one in remaining list * address empty catch * add unit test Co-authored-by: Zhiyang.Wang1 <[email protected]>
1 parent cf63cab commit c188304

File tree

2 files changed

+167
-84
lines changed

2 files changed

+167
-84
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/FailedBatchProcessor.java

Lines changed: 36 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,10 @@
1616

1717
package org.springframework.kafka.listener;
1818

19-
import java.time.Duration;
2019
import java.util.ArrayList;
2120
import java.util.Collections;
2221
import java.util.HashMap;
2322
import java.util.HashSet;
24-
import java.util.Iterator;
2523
import java.util.List;
2624
import java.util.Map;
2725
import java.util.Set;
@@ -37,6 +35,7 @@
3735

3836
import org.springframework.kafka.KafkaException;
3937
import org.springframework.kafka.KafkaException.Level;
38+
import org.springframework.kafka.support.KafkaUtils;
4039
import org.springframework.lang.Nullable;
4140
import org.springframework.util.backoff.BackOff;
4241

@@ -50,6 +49,7 @@
5049
*
5150
* @author Gary Russell
5251
* @author Francois Rosiere
52+
* @author Wang Zhiyang
5353
* @since 2.8
5454
*
5555
*/
@@ -120,7 +120,7 @@ public void setReclassifyOnExceptionChange(boolean reclassifyOnExceptionChange)
120120
@Override
121121
protected void notRetryable(Stream<Class<? extends Exception>> notRetryable) {
122122
if (this.fallbackBatchHandler instanceof ExceptionClassifier handler) {
123-
notRetryable.forEach(ex -> handler.addNotRetryableExceptions(ex));
123+
notRetryable.forEach(handler::addNotRetryableExceptions);
124124
}
125125
}
126126

@@ -178,7 +178,6 @@ protected <K, V> ConsumerRecords<K, V> handle(Exception thrownException, Consume
178178
else {
179179
return String.format("Record not found in batch, index %d out of bounds (0, %d); "
180180
+ "re-seeking batch", index, data.count() - 1);
181-
182181
}
183182
});
184183
fallback(thrownException, data, consumer, container, invokeListener);
@@ -201,11 +200,9 @@ private int findIndex(ConsumerRecords<?, ?> data, ConsumerRecord<?, ?> record) {
201200
return -1;
202201
}
203202
int i = 0;
204-
Iterator<?> iterator = data.iterator();
205-
while (iterator.hasNext()) {
206-
ConsumerRecord<?, ?> candidate = (ConsumerRecord<?, ?>) iterator.next();
207-
if (candidate.topic().equals(record.topic()) && candidate.partition() == record.partition()
208-
&& candidate.offset() == record.offset()) {
203+
for (ConsumerRecord<?, ?> datum : data) {
204+
if (datum.topic().equals(record.topic()) && datum.partition() == record.partition()
205+
&& datum.offset() == record.offset()) {
209206
break;
210207
}
211208
i++;
@@ -220,29 +217,25 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
220217
if (data == null) {
221218
return ConsumerRecords.empty();
222219
}
223-
Iterator<?> iterator = data.iterator();
224-
List<ConsumerRecord<?, ?>> toCommit = new ArrayList<>();
225220
List<ConsumerRecord<?, ?>> remaining = new ArrayList<>();
226221
int index = indexArg;
227-
while (iterator.hasNext()) {
228-
ConsumerRecord<?, ?> record = (ConsumerRecord<?, ?>) iterator.next();
222+
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
223+
for (ConsumerRecord<?, ?> datum : data) {
229224
if (index-- > 0) {
230-
toCommit.add(record);
225+
offsets.compute(new TopicPartition(datum.topic(), datum.partition()),
226+
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, datum.offset() + 1));
231227
}
232228
else {
233-
remaining.add(record);
229+
remaining.add(datum);
234230
}
235231
}
236-
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
237-
toCommit.forEach(rec -> offsets.compute(new TopicPartition(rec.topic(), rec.partition()),
238-
(key, val) -> ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1)));
239232
if (offsets.size() > 0) {
240233
commit(consumer, container, offsets);
241234
}
242235
if (isSeekAfterError()) {
243236
if (remaining.size() > 0) {
244237
SeekUtils.seekOrRecover(thrownException, remaining, consumer, container, false,
245-
getFailureTracker()::recovered, this.logger, getLogLevel());
238+
getFailureTracker(), this.logger, getLogLevel());
246239
ConsumerRecord<?, ?> recovered = remaining.get(0);
247240
commit(consumer, container,
248241
Collections.singletonMap(new TopicPartition(recovered.topic(), recovered.partition()),
@@ -254,35 +247,43 @@ private <K, V> ConsumerRecords<K, V> seekOrRecover(Exception thrownException, @N
254247
return ConsumerRecords.empty();
255248
}
256249
else {
257-
if (indexArg == 0) {
258-
return (ConsumerRecords<K, V>) data; // first record just rerun the whole thing
259-
}
260-
else {
250+
if (remaining.size() > 0) {
261251
try {
262252
if (getFailureTracker().recovered(remaining.get(0), thrownException, container,
263253
consumer)) {
264254
remaining.remove(0);
265255
}
266256
}
267257
catch (Exception e) {
258+
if (SeekUtils.isBackoffException(thrownException)) {
259+
this.logger.debug(e, () -> KafkaUtils.format(remaining.get(0))
260+
+ " included in remaining due to retry back off " + thrownException);
261+
}
262+
else {
263+
this.logger.error(e, KafkaUtils.format(remaining.get(0))
264+
+ " included in remaining due to " + thrownException);
265+
}
268266
}
269-
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
270-
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
271-
tp -> new ArrayList<ConsumerRecord<K, V>>()).add((ConsumerRecord<K, V>) rec));
272-
return new ConsumerRecords<>(remains);
273267
}
268+
if (remaining.isEmpty()) {
269+
return ConsumerRecords.empty();
270+
}
271+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remains = new HashMap<>();
272+
remaining.forEach(rec -> remains.computeIfAbsent(new TopicPartition(rec.topic(), rec.partition()),
273+
tp -> new ArrayList<>()).add((ConsumerRecord<K, V>) rec));
274+
return new ConsumerRecords<>(remains);
274275
}
275276
}
276277

277-
private void commit(Consumer<?, ?> consumer, MessageListenerContainer container, Map<TopicPartition, OffsetAndMetadata> offsets) {
278+
private void commit(Consumer<?, ?> consumer, MessageListenerContainer container,
279+
Map<TopicPartition, OffsetAndMetadata> offsets) {
278280

279-
boolean syncCommits = container.getContainerProperties().isSyncCommits();
280-
Duration timeout = container.getContainerProperties().getSyncCommitTimeout();
281-
if (syncCommits) {
282-
consumer.commitSync(offsets, timeout);
281+
ContainerProperties properties = container.getContainerProperties();
282+
if (properties.isSyncCommits()) {
283+
consumer.commitSync(offsets, properties.getSyncCommitTimeout());
283284
}
284285
else {
285-
OffsetCommitCallback commitCallback = container.getContainerProperties().getCommitCallback();
286+
OffsetCommitCallback commitCallback = properties.getCommitCallback();
286287
if (commitCallback == null) {
287288
commitCallback = LOGGING_COMMIT_CALLBACK;
288289
}
@@ -304,8 +305,8 @@ private BatchListenerFailedException getBatchListenerFailedException(Throwable t
304305
throwable = throwable.getCause();
305306
checked.add(throwable);
306307

307-
if (throwable instanceof BatchListenerFailedException) {
308-
target = (BatchListenerFailedException) throwable;
308+
if (throwable instanceof BatchListenerFailedException batchListenerFailedException) {
309+
target = batchListenerFailedException;
309310
break;
310311
}
311312
}

0 commit comments

Comments
 (0)