Skip to content

Commit 0c5867e

Browse files
garyrussellartembilan
authored andcommitted
GH-1591: Apply BackOff in DARP with Batch Listener
Resolves #1591 `BackOff` was ignored for batch listeners (not recoverable). - move code from `SeekToCurrentBatchErrorHandler` to `ListenerUtils` - call from both STCEH and DARP - also clear the thread state (in both) if a batch fails and subsequently succeeds **I will do the backports - conflicts are expected**
1 parent 0106acf commit 0c5867e

File tree

7 files changed

+118
-27
lines changed

7 files changed

+118
-27
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.lang.Nullable;
3333
import org.springframework.util.Assert;
3434
import org.springframework.util.backoff.BackOff;
35+
import org.springframework.util.backoff.BackOffExecution;
3536

3637
/**
3738
* Default implementation of {@link AfterRollbackProcessor}. Seeks all
@@ -52,6 +53,12 @@
5253
public class DefaultAfterRollbackProcessor<K, V> extends FailedRecordProcessor
5354
implements AfterRollbackProcessor<K, V>, InitializingBean {
5455

56+
private final ThreadLocal<BackOffExecution> backOffs = new ThreadLocal<>(); // Intentionally not static
57+
58+
private final ThreadLocal<Long> lastIntervals = new ThreadLocal<>(); // Intentionally not static
59+
60+
private final BackOff backOff;
61+
5562
private KafkaOperations<?, ?> kafkaTemplate;
5663

5764
/**
@@ -115,6 +122,7 @@ public DefaultAfterRollbackProcessor(@Nullable BiConsumer<ConsumerRecord<?, ?>,
115122
this.kafkaTemplate = kafkaOperations;
116123
super.setCommitRecovered(commitRecovered);
117124
checkConfig();
125+
this.backOff = backOff;
118126
}
119127

120128
@Override
@@ -156,6 +164,10 @@ && isCommitRecovered() && this.kafkaTemplate.isTransactional()) {
156164
new OffsetAndMetadata(skipped.offset() + 1)), consumer.groupMetadata());
157165
}
158166
}
167+
168+
if (!recoverable && this.backOff != null) {
169+
ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals);
170+
}
159171
}
160172

161173
@Override
@@ -210,4 +222,11 @@ public void setKafkaOperations(KafkaOperations<K, V> kafkaOperations) {
210222
this.kafkaTemplate = kafkaOperations;
211223
}
212224

225+
@Override
226+
public void clearThreadState() {
227+
super.clearThreadState();
228+
this.backOffs.remove();
229+
this.lastIntervals.remove();
230+
}
231+
213232
}

spring-kafka/src/main/java/org/springframework/kafka/listener/GenericErrorHandler.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ default void handle(Exception thrownException, T data, Consumer<?, ?> consumer)
5353
* @since 2.3
5454
*/
5555
default void clearThreadState() {
56-
// NOSONAR
5756
}
5857

5958
/**

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -601,6 +601,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
601601

602602
private boolean wasIdle;
603603

604+
private boolean batchFailed;
605+
604606
private volatile boolean consumerPaused;
605607

606608
private volatile Thread consumerThread;
@@ -1605,6 +1607,11 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
16051607
try {
16061608
invokeBatchOnMessage(records, recordList);
16071609
successTimer(sample);
1610+
if (this.batchFailed) {
1611+
this.batchFailed = false;
1612+
this.batchErrorHandler.clearThreadState();
1613+
getAfterRollbackProcessor().clearThreadState();
1614+
}
16081615
}
16091616
catch (RuntimeException e) {
16101617
failureTimer(sample);
@@ -1616,6 +1623,7 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
16161623
throw e;
16171624
}
16181625
try {
1626+
this.batchFailed = true;
16191627
invokeBatchErrorHandler(records, recordList, e);
16201628
// unlikely, but possible, that a batch error handler "handles" the error
16211629
if ((!acked && !this.autoCommit && this.batchErrorHandler.isAckAfterHandle())

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.springframework.kafka.support.serializer.DeserializationException;
3030
import org.springframework.lang.Nullable;
3131
import org.springframework.util.Assert;
32+
import org.springframework.util.backoff.BackOff;
33+
import org.springframework.util.backoff.BackOffExecution;
3234

3335
/**
3436
* Listener utilities.
@@ -142,5 +144,41 @@ public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
142144
}
143145
}
144146

147+
/**
148+
* Sleep according to the {@link BackOff}; when the {@link BackOffExecution} returns
149+
* {@link BackOffExecution#STOP} sleep for the previous backOff.
150+
* @param backOff the {@link BackOff} to create a new {@link BackOffExecution}.
151+
* @param executions a thread local containing the {@link BackOffExecution} for this
152+
* thread.
153+
* @param lastIntervals a thread local containing the previous {@link BackOff}
154+
* interval for this thread.
155+
* @since 2.3.12
156+
*/
157+
public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExecution> executions,
158+
ThreadLocal<Long> lastIntervals) {
159+
160+
BackOffExecution backOffExecution = executions.get();
161+
if (backOffExecution == null) {
162+
backOffExecution = backOff.start();
163+
executions.set(backOffExecution);
164+
}
165+
Long interval = backOffExecution.nextBackOff();
166+
if (interval == BackOffExecution.STOP) {
167+
interval = lastIntervals.get();
168+
if (interval == null) {
169+
interval = Long.valueOf(0);
170+
}
171+
}
172+
lastIntervals.set(interval);
173+
if (interval > 0) {
174+
try {
175+
Thread.sleep(interval);
176+
}
177+
catch (@SuppressWarnings("unused") InterruptedException e) {
178+
Thread.currentThread().interrupt();
179+
}
180+
}
181+
}
182+
145183
}
146184

spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandler.java

Lines changed: 3 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class SeekToCurrentBatchErrorHandler extends KafkaExceptionLogLevelAware
3939

4040
private final ThreadLocal<BackOffExecution> backOffs = new ThreadLocal<>(); // Intentionally not static
4141

42-
private final ThreadLocal<Long> lastInterval = new ThreadLocal<>(); // Intentionally not static
42+
private final ThreadLocal<Long> lastIntervals = new ThreadLocal<>(); // Intentionally not static
4343

4444
private BackOff backOff;
4545

@@ -66,27 +66,7 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consum
6666
.forEach(consumer::seek);
6767

6868
if (this.backOff != null) {
69-
BackOffExecution backOffExecution = this.backOffs.get();
70-
if (backOffExecution == null) {
71-
backOffExecution = this.backOff.start();
72-
this.backOffs.set(backOffExecution);
73-
}
74-
Long interval = backOffExecution.nextBackOff();
75-
if (interval == BackOffExecution.STOP) {
76-
interval = this.lastInterval.get();
77-
if (interval == null) {
78-
interval = Long.valueOf(0);
79-
}
80-
}
81-
this.lastInterval.set(interval);
82-
if (interval > 0) {
83-
try {
84-
Thread.sleep(interval);
85-
}
86-
catch (@SuppressWarnings("unused") InterruptedException e) {
87-
Thread.currentThread().interrupt();
88-
}
89-
}
69+
ListenerUtils.unrecoverableBackOff(this.backOff, this.backOffs, this.lastIntervals);
9070
}
9171

9272
throw new KafkaException("Seek to current after exception", getLogLevel(), thrownException);
@@ -95,7 +75,7 @@ public void handle(Exception thrownException, ConsumerRecords<?, ?> data, Consum
9575
@Override
9676
public void clearThreadState() {
9777
this.backOffs.remove();
98-
this.lastInterval.remove();
78+
this.lastIntervals.remove();
9979
}
10080

10181
}

spring-kafka/src/test/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessorTests.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import static org.mockito.ArgumentMatchers.any;
2121
import static org.mockito.ArgumentMatchers.anyMap;
2222
import static org.mockito.BDDMockito.given;
23+
import static org.mockito.BDDMockito.willAnswer;
2324
import static org.mockito.Mockito.inOrder;
2425
import static org.mockito.Mockito.mock;
2526
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.spy;
2628
import static org.mockito.Mockito.times;
2729
import static org.mockito.Mockito.verify;
2830

@@ -41,6 +43,9 @@
4143
import org.springframework.kafka.core.KafkaOperations;
4244
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
4345
import org.springframework.kafka.support.serializer.DeserializationException;
46+
import org.springframework.util.backoff.BackOff;
47+
import org.springframework.util.backoff.BackOffExecution;
48+
import org.springframework.util.backoff.FixedBackOff;
4449

4550
/**
4651
* @author Gary Russell
@@ -50,7 +55,7 @@
5055
public class DefaultAfterRollbackProcessorTests {
5156

5257
@Test
53-
public void testClassifier() {
58+
void testClassifier() {
5459
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
5560
AtomicBoolean recovererShouldFail = new AtomicBoolean(false);
5661
@SuppressWarnings("unchecked")
@@ -92,4 +97,36 @@ public void testClassifier() {
9297
inOrder.verifyNoMoreInteractions();
9398
}
9499

100+
@Test
101+
void testBatchBackOff() {
102+
AtomicReference<ConsumerRecord<?, ?>> recovered = new AtomicReference<>();
103+
@SuppressWarnings("unchecked")
104+
KafkaOperations<String, String> template = mock(KafkaOperations.class);
105+
given(template.isTransactional()).willReturn(true);
106+
BackOff backOff = spy(new FixedBackOff(0, 1));
107+
AtomicReference<BackOffExecution> execution = new AtomicReference<>();
108+
willAnswer(inv -> {
109+
BackOffExecution exec = spy((BackOffExecution) inv.callRealMethod());
110+
execution.set(exec);
111+
return exec;
112+
}).given(backOff).start();
113+
ConsumerRecordRecoverer recoverer = mock(ConsumerRecordRecoverer.class);
114+
DefaultAfterRollbackProcessor<String, String> processor = new DefaultAfterRollbackProcessor<>(recoverer,
115+
backOff, template, false);
116+
ConsumerRecord<String, String> record1 = new ConsumerRecord<>("foo", 0, 0L, "foo", "bar");
117+
ConsumerRecord<String, String> record2 = new ConsumerRecord<>("foo", 1, 1L, "foo", "bar");
118+
List<ConsumerRecord<String, String>> records = Arrays.asList(record1, record2);
119+
IllegalStateException illegalState = new IllegalStateException();
120+
@SuppressWarnings("unchecked")
121+
Consumer<String, String> consumer = mock(Consumer.class);
122+
given(consumer.groupMetadata()).willReturn(new ConsumerGroupMetadata("foo"));
123+
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
124+
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
125+
verify(backOff, times(2)).start();
126+
verify(execution.get(), times(2)).nextBackOff();
127+
processor.clearThreadState();
128+
processor.process(records, consumer, illegalState, false, EOSMode.BETA);
129+
verify(backOff, times(3)).start();
130+
}
131+
95132
}

spring-kafka/src/test/java/org/springframework/kafka/listener/SeekToCurrentBatchErrorHandlerTests.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import static org.mockito.BDDMockito.willReturn;
2828
import static org.mockito.Mockito.inOrder;
2929
import static org.mockito.Mockito.mock;
30+
import static org.mockito.Mockito.spy;
31+
import static org.mockito.Mockito.verify;
3032

3133
import java.time.Duration;
3234
import java.util.Arrays;
@@ -67,6 +69,7 @@
6769
import org.springframework.kafka.transaction.KafkaTransactionManager;
6870
import org.springframework.test.annotation.DirtiesContext;
6971
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
72+
import org.springframework.util.backoff.BackOff;
7073
import org.springframework.util.backoff.FixedBackOff;
7174

7275
/**
@@ -135,10 +138,17 @@ void testBackOff() {
135138
long t1 = System.currentTimeMillis();
136139
for (int i = 0; i < 10; i++) {
137140
assertThatThrownBy(() -> eh.handle(ex, crs, mock(Consumer.class), mock(MessageListenerContainer.class)))
138-
.isInstanceOf(KafkaException.class)
139-
.hasCause(ex);
141+
.isInstanceOf(KafkaException.class)
142+
.hasCause(ex);
140143
}
141144
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(100L);
145+
eh.clearThreadState();
146+
BackOff backOff = spy(new FixedBackOff(0L, 0L));
147+
eh.setBackOff(backOff);
148+
assertThatThrownBy(() -> eh.handle(ex, crs, mock(Consumer.class), mock(MessageListenerContainer.class)))
149+
.isInstanceOf(KafkaException.class)
150+
.hasCause(ex);
151+
verify(backOff).start();
142152
}
143153

144154
@SuppressWarnings({ "unchecked", "rawtypes" })

0 commit comments

Comments
 (0)