@@ -761,6 +761,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
761
761
762
762
private ConsumerRecords <K , V > pendingRecordsAfterError ;
763
763
764
+ private boolean pauseForPending ;
765
+
764
766
private volatile boolean consumerPaused ;
765
767
766
768
private volatile Thread consumerThread ;
@@ -1547,7 +1549,10 @@ private ConsumerRecords<K, V> doPoll() {
1547
1549
+ "after an error; emergency stop invoked to avoid message loss" , howManyRecords ));
1548
1550
KafkaMessageListenerContainer .this .emergencyStop .run ();
1549
1551
}
1550
- if (!isPartitionPaused (this .pendingRecordsAfterError .partitions ().iterator ().next ())) {
1552
+ TopicPartition firstPart = this .pendingRecordsAfterError .partitions ().iterator ().next ();
1553
+ boolean isPaused = isPartitionPauseRequested (firstPart );
1554
+ this .logger .debug (() -> "First pending after error: " + firstPart + "; paused: " + isPaused );
1555
+ if (!isPaused ) {
1551
1556
records = this .pendingRecordsAfterError ;
1552
1557
this .pendingRecordsAfterError = null ;
1553
1558
}
@@ -1663,10 +1668,11 @@ private void doPauseConsumerIfNecessary() {
1663
1668
this .logger .debug (() -> "Pausing for incomplete async acks: " + this .offsetsInThisBatch );
1664
1669
}
1665
1670
if (!this .consumerPaused && (isPaused () || this .pausedForAsyncAcks )
1666
- || this .pendingRecordsAfterError != null ) {
1671
+ || this .pauseForPending ) {
1667
1672
1668
1673
this .consumer .pause (this .consumer .assignment ());
1669
1674
this .consumerPaused = true ;
1675
+ this .pauseForPending = false ;
1670
1676
this .logger .debug (() -> "Paused consumption from: " + this .consumer .paused ());
1671
1677
publishConsumerPausedEvent (this .consumer .assignment ());
1672
1678
}
@@ -2381,6 +2387,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
2381
2387
() -> invokeBatchOnMessageWithRecordsOrList (records , list ));
2382
2388
if (!afterHandling .isEmpty ()) {
2383
2389
this .pendingRecordsAfterError = afterHandling ;
2390
+ this .pauseForPending = true ;
2384
2391
}
2385
2392
}
2386
2393
}
@@ -2786,6 +2793,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
2786
2793
}
2787
2794
if (records .size () > 0 ) {
2788
2795
this .pendingRecordsAfterError = new ConsumerRecords <>(records );
2796
+ this .pauseForPending = true ;
2789
2797
}
2790
2798
}
2791
2799
}
0 commit comments