Skip to content

Commit 9f58014

Browse files
Gh 1716 - Refactoring and Test Coverage (#1734)
* Fixes GH-1716. * GH-1716 - Fixing Copy/Paste artifact. * GH-1716 - Refactorings and improved test coverage. * Checkstyle adjustments. * Code review adjustments. * Adjust test names and exception inheritance
1 parent c24ce2c commit 9f58014

33 files changed

+1430
-783
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,27 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.
2323

2424
IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations
2525

26+
=== Back Off Delay Precision
27+
28+
==== Overview and Guarantees
29+
30+
All message processing and backing off is handled by the consumer thread, and, as such, delay precision is guaranteed on a best-effort basis.
31+
If one message's processing takes longer than the next message's back off period for that consumer, the next message's delay will be higher than expected.
32+
Also, for short delays (about 1s or less), the maintenance work the thread has to do, such as committing offsets, may delay the message processing execution.
33+
The precision can also be affected if the retry topic's consumer is handling more than one partition, because we rely on waking up the consumer from polling and having full pollTimeouts to make timing adjustments.
34+
35+
That being said, for consumers handling a single partition the message's processing should happen under 100ms after it's exact due time for most situations.
36+
37+
IMPORTANT: It is guaranteed that a message will never be processed before its due time.
38+
39+
==== Tuning the Delay Precision
40+
41+
The message's processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`.
42+
Both properties will be automatically set in the retry topic and dlt's `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms.
43+
These values will only be set if the property has its default values - if you change either value yourself your change will not be overridden.
44+
This way you can tune the precision and performance for the retry topics if you need to.
45+
46+
NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics.
2647

2748
=== Configuration
2849

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@
174174
TopicSuffixingStrategy topicSuffixingStrategy() default TopicSuffixingStrategy.SUFFIX_WITH_DELAY_VALUE;
175175

176176
/**
177-
* Whether or not to redeliver to the DLT if delivery fails.
177+
* Whether or not create a DLT, and redeliver to the DLT if delivery fails or just give up.
178178
* @return the dlt strategy.
179179
*/
180180
DltStrategy dltStrategy() default DltStrategy.ALWAYS_RETRY_ON_ERROR;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaConsumerBackoffManager.java

Lines changed: 114 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,20 @@
1818

1919
import java.time.Clock;
2020
import java.time.Instant;
21-
import java.time.temporal.ChronoUnit;
2221
import java.util.HashMap;
2322
import java.util.Map;
2423

24+
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.clients.consumer.Consumer;
2526
import org.apache.kafka.common.TopicPartition;
2627

2728
import org.springframework.beans.factory.annotation.Qualifier;
2829
import org.springframework.context.ApplicationListener;
30+
import org.springframework.core.log.LogAccessor;
31+
import org.springframework.core.task.TaskExecutor;
2932
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
33+
import org.springframework.lang.Nullable;
34+
import org.springframework.retry.backoff.Sleeper;
3035

3136
/**
3237
*
@@ -47,87 +52,163 @@
4752
*/
4853
public class KafkaConsumerBackoffManager implements ApplicationListener<ListenerContainerPartitionIdleEvent> {
4954

55+
private static final LogAccessor logger = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
5056
/**
5157
* Internal Back Off Clock Bean Name.
5258
*/
5359
public static final String INTERNAL_BACKOFF_CLOCK_BEAN_NAME = "internalBackOffClock";
5460

61+
private static final int TIMING_CORRECTION_THRESHOLD = 100;
62+
63+
private static final int POLL_TIMEOUTS_FOR_CORRECTION_WINDOW = 2;
64+
5565
private final ListenerContainerRegistry registry;
5666

57-
private final Map<TopicPartition, Context> backOffTimes;
67+
private final Map<TopicPartition, Context> backOffContexts;
5868

5969
private final Clock clock;
6070

71+
private final TaskExecutor taskExecutor;
72+
73+
private final Sleeper sleeper;
74+
6175
public KafkaConsumerBackoffManager(ListenerContainerRegistry registry,
62-
@Qualifier(INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
76+
@Qualifier(INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock,
77+
TaskExecutor taskExecutor,
78+
Sleeper sleeper) {
6379

6480
this.registry = registry;
6581
this.clock = clock;
66-
this.backOffTimes = new HashMap<>();
82+
this.taskExecutor = taskExecutor;
83+
this.sleeper = sleeper;
84+
this.backOffContexts = new HashMap<>();
6785
}
6886

6987
/**
7088
* Backs off if the current time is before the dueTimestamp provided
7189
* in the {@link Context} object.
72-
* @param context the state that will be used for backing off.
90+
* @param context the back off context for this execution.
7391
*/
7492
public void maybeBackoff(Context context) {
75-
long backoffTime = ChronoUnit.MILLIS.between(Instant.now(this.clock),
76-
Instant.ofEpochMilli(context.dueTimestamp));
93+
long backoffTime = context.dueTimestamp - getCurrentMillisFromClock();
7794
if (backoffTime > 0) {
7895
pauseConsumptionAndThrow(context, backoffTime);
7996
}
8097
}
8198

82-
private void pauseConsumptionAndThrow(Context context, Long timeToSleep) throws KafkaBackoffException {
99+
private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException {
83100
TopicPartition topicPartition = context.topicPartition;
84101
getListenerContainerFromContext(context).pausePartition(topicPartition);
85102
addBackoff(context, topicPartition);
86103
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
87104
"backing off for approx. %s millis.", context.topicPartition.partition(),
88-
context.topicPartition.topic(), timeToSleep),
105+
context.topicPartition.topic(), backOffTime),
89106
topicPartition, context.listenerId, context.dueTimestamp);
90107
}
91108

92109
@Override
93110
public void onApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdleEvent) {
94-
Context context = getBackoff(partitionIdleEvent.getTopicPartition());
95-
if (context == null || isNotDue(context.dueTimestamp)) {
111+
logger.debug(() -> String.format("partitionIdleEvent received at %s. Partition: %s",
112+
getCurrentMillisFromClock(), partitionIdleEvent.getTopicPartition()));
113+
114+
Context backOffContext = getBackOffContext(partitionIdleEvent.getTopicPartition());
115+
116+
if (backOffContext == null) {
96117
return;
97118
}
119+
maybeResumeConsumption(backOffContext);
120+
}
121+
122+
private long getCurrentMillisFromClock() {
123+
return Instant.now(this.clock).toEpochMilli();
124+
}
125+
126+
private void maybeResumeConsumption(Context context) {
127+
long now = getCurrentMillisFromClock();
128+
long timeUntilDue = context.dueTimestamp - now;
129+
long pollTimeout = getListenerContainerFromContext(context)
130+
.getContainerProperties()
131+
.getPollTimeout();
132+
boolean isDue = timeUntilDue <= pollTimeout;
133+
134+
if (maybeApplyTimingCorrection(context, pollTimeout, timeUntilDue) || isDue) {
135+
resumePartition(context);
136+
}
137+
else {
138+
logger.debug(() -> String.format("TopicPartition %s not due. DueTimestamp: %s Now: %s ",
139+
context.topicPartition, context.dueTimestamp, now));
140+
}
141+
}
142+
143+
private void resumePartition(Context context) {
98144
MessageListenerContainer container = getListenerContainerFromContext(context);
145+
logger.debug(() -> "Resuming partition at " + getCurrentMillisFromClock());
99146
container.resumePartition(context.topicPartition);
100147
removeBackoff(context.topicPartition);
101148
}
102149

103-
private boolean isNotDue(long dueTimestamp) {
104-
return Instant.now(this.clock).isBefore(Instant.ofEpochMilli(dueTimestamp));
150+
private boolean maybeApplyTimingCorrection(Context context, long pollTimeout, long timeUntilDue) {
151+
// Correction can only be applied to ConsumerAwareMessageListener
152+
// listener instances.
153+
if (context.consumerForTimingCorrection == null) {
154+
return false;
155+
}
156+
157+
boolean isInCorrectionWindow = timeUntilDue > pollTimeout && timeUntilDue <=
158+
pollTimeout * POLL_TIMEOUTS_FOR_CORRECTION_WINDOW;
159+
160+
long correctionAmount = timeUntilDue % pollTimeout;
161+
if (isInCorrectionWindow && correctionAmount > TIMING_CORRECTION_THRESHOLD) {
162+
this.taskExecutor.execute(() -> doApplyTimingCorrection(context, correctionAmount));
163+
return true;
164+
}
165+
return false;
166+
}
167+
168+
private void doApplyTimingCorrection(Context context, long correctionAmount) {
169+
try {
170+
logger.debug(() -> String.format("Applying correction of %s millis at %s for TopicPartition %s",
171+
correctionAmount, getCurrentMillisFromClock(), context.topicPartition));
172+
this.sleeper.sleep(correctionAmount);
173+
logger.debug(() -> "Waking up consumer for partition topic: " + context.topicPartition);
174+
context.consumerForTimingCorrection.wakeup();
175+
}
176+
catch (InterruptedException e) {
177+
Thread.interrupted();
178+
throw new IllegalStateException("Interrupted waking up consumer while applying correction " +
179+
"for TopicPartition " + context.topicPartition, e);
180+
}
181+
catch (Throwable e) {
182+
logger.error(e, () -> "Error waking up consumer while applying correction " +
183+
"for TopicPartition " + context.topicPartition);
184+
}
105185
}
106186

107187
private MessageListenerContainer getListenerContainerFromContext(Context context) {
108188
return this.registry.getListenerContainer(context.listenerId);
109189
}
110190

111191
protected void addBackoff(Context context, TopicPartition topicPartition) {
112-
synchronized (this.backOffTimes) {
113-
this.backOffTimes.put(topicPartition, context);
192+
synchronized (this.backOffContexts) {
193+
this.backOffContexts.put(topicPartition, context);
114194
}
115195
}
116196

117-
protected Context getBackoff(TopicPartition topicPartition) {
118-
synchronized (this.backOffTimes) {
119-
return this.backOffTimes.get(topicPartition);
197+
protected Context getBackOffContext(TopicPartition topicPartition) {
198+
synchronized (this.backOffContexts) {
199+
return this.backOffContexts.get(topicPartition);
120200
}
121201
}
122202

123203
protected void removeBackoff(TopicPartition topicPartition) {
124-
synchronized (this.backOffTimes) {
125-
this.backOffTimes.remove(topicPartition);
204+
synchronized (this.backOffContexts) {
205+
this.backOffContexts.remove(topicPartition);
126206
}
127207
}
128208

129-
public Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition) {
130-
return new Context(dueTimestamp, listenerId, topicPartition);
209+
public Context createContext(long dueTimestamp, String listenerId, TopicPartition topicPartition,
210+
@Nullable Consumer<?, ?> consumerForTimingCorrection) {
211+
return new Context(dueTimestamp, topicPartition, listenerId, consumerForTimingCorrection);
131212
}
132213

133214
/**
@@ -140,22 +221,29 @@ public static class Context {
140221
* The time after which the message should be processed,
141222
* in milliseconds since epoch.
142223
*/
143-
final long dueTimestamp; // NOSONAR
224+
private final long dueTimestamp; // NOSONAR
144225

145226
/**
146227
* The id for the listener that should be paused.
147228
*/
148-
final String listenerId; // NOSONAR
229+
private final String listenerId; // NOSONAR
149230

150231
/**
151232
* The topic that contains the partition to be paused.
152233
*/
153-
final TopicPartition topicPartition; // NOSONAR
234+
private final TopicPartition topicPartition; // NOSONAR
235+
236+
/**
237+
* The consumer of the message, if present.
238+
*/
239+
private final Consumer<?, ?> consumerForTimingCorrection; // NOSONAR
154240

155-
Context(long dueTimestamp, String listenerId, TopicPartition topicPartition) {
241+
Context(long dueTimestamp, TopicPartition topicPartition, String listenerId,
242+
@Nullable Consumer<?, ?> consumerForTimingCorrection) {
156243
this.dueTimestamp = dueTimestamp;
157244
this.listenerId = listenerId;
158245
this.topicPartition = topicPartition;
246+
this.consumerForTimingCorrection = consumerForTimingCorrection;
159247
}
160248
}
161249
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1260,6 +1260,7 @@ protected void pollAndInvoke() {
12601260
resumeConsumerIfNeccessary();
12611261
resumePartitionsIfNecessary();
12621262
debugRecords(records);
1263+
12631264
if (records != null && records.count() > 0) {
12641265
savePositionsIfNeeded(records);
12651266
notIdle();
@@ -1268,6 +1269,9 @@ protected void pollAndInvoke() {
12681269
}
12691270
else {
12701271
checkIdle();
1272+
}
1273+
if (records == null || records.count() == 0
1274+
|| records.partitions().size() < this.consumer.assignment().size()) {
12711275
checkIdlePartitions();
12721276
}
12731277
}
@@ -2212,7 +2216,12 @@ private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> recor
22122216
commitOffsetsIfNeeded(record);
22132217
}
22142218
catch (KafkaException ke) {
2215-
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2219+
if (ke.contains(KafkaBackoffException.class)) {
2220+
this.logger.warn(ke.getMessage());
2221+
}
2222+
else {
2223+
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
2224+
}
22162225
return ke;
22172226
}
22182227
catch (RuntimeException ee) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2018-2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import java.time.Clock;
20+
import java.time.Instant;
21+
22+
import org.springframework.kafka.KafkaException;
23+
24+
/**
25+
* A {@link KafkaException} that records the timestamp
26+
* of when it was thrown.
27+
*
28+
* @author Tomaz Fernandes
29+
* @since 2.7
30+
*/
31+
public class TimestampedException extends KafkaException {
32+
33+
private static final long serialVersionUID = -2544217643924234282L;
34+
35+
private final long timestamp;
36+
37+
public TimestampedException(Exception ex, Clock clock) {
38+
super(ex.getMessage(), ex);
39+
this.timestamp = Instant.now(clock).toEpochMilli();
40+
}
41+
42+
public TimestampedException(Exception ex) {
43+
super(ex.getMessage(), ex);
44+
this.timestamp = Instant.now(Clock.systemDefaultZone()).toEpochMilli();
45+
}
46+
47+
public long getTimestamp() {
48+
return this.timestamp;
49+
}
50+
}

0 commit comments

Comments
 (0)