Skip to content

Commit cee9bed

Browse files
authored
GH-1587: Option to Correct Transactional Offsets
Resolves #1587 See javadoc for `ConsumerProperties.setFixTxOffsets()` for more information. **cherry-pick to 2.5.x** * Add `this.` to `logger` to honor Checkstyle
1 parent cb8183e commit cee9bed

File tree

4 files changed

+109
-5
lines changed

4 files changed

+109
-5
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ public class ConsumerProperties {
105105

106106
private int commitRetries = DEFAULT_COMMIT_RETRIES;
107107

108+
private boolean fixTxOffsets;
109+
108110
/**
109111
* Create properties for a container that will subscribe to the specified topics.
110112
* @param topics the topics.
@@ -390,6 +392,32 @@ public void setOnlyLogRecordMetadata(boolean onlyLogRecordMetadata) {
390392
this.onlyLogRecordMetadata = onlyLogRecordMetadata;
391393
}
392394

395+
/**
396+
* Whether or not to correct terminal transactional offsets.
397+
* @return true to fix.
398+
* @since 2.5.6
399+
* @see #setFixTxOffsets(boolean)
400+
*/
401+
public boolean isFixTxOffsets() {
402+
return this.fixTxOffsets;
403+
}
404+
405+
/**
406+
* When consuming records produced by a transactional producer, and the consumer is
407+
* positioned at the end of a partition, the lag can incorrectly be reported as
408+
* greater than zero, due to the pseudo record used to indicate transaction
409+
* commit/rollback and, possibly, the presence of rolled-back records. This does not
410+
* functionally affect the consumer but some users have expressed concern that the
411+
* "lag" is non-zero. Set this to true and the container will correct such
412+
* mis-reported offsets. The check is performed before the next poll to avoid adding
413+
* significant complexity to the commit processing.
414+
* @param fixTxOffsets true to correct the offset(s).
415+
* @since 2.5.6
416+
*/
417+
public void setFixTxOffsets(boolean fixTxOffsets) {
418+
this.fixTxOffsets = fixTxOffsets;
419+
}
420+
393421
@Override
394422
public String toString() {
395423
return "ConsumerProperties ["

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 53 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
456456

457457
private final Collection<TopicPartition> assignedPartitions = new LinkedHashSet<>();
458458

459+
private final Map<TopicPartition, OffsetAndMetadata> lastCommits = new HashMap<>();
460+
459461
private final GenericMessageListener<?> genericListener;
460462

461463
private final ConsumerSeekAware consumerSeekAwareListener;
@@ -557,6 +559,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
557559

558560
private final String clientId;
559561

562+
private final boolean fixTxOffsets = this.containerProperties.isFixTxOffsets();
563+
560564
private Map<TopicPartition, OffsetMetadata> definedPartitions;
561565

562566
private int count;
@@ -1090,6 +1094,7 @@ protected void pollAndInvoke() {
10901094
if (!this.autoCommit && !this.isRecordAck) {
10911095
processCommits();
10921096
}
1097+
fixTxOffsetsIfNeeded();
10931098
idleBetweenPollIfNecessary();
10941099
if (this.seeks.size() > 0) {
10951100
processSeeks();
@@ -1124,6 +1129,42 @@ protected void pollAndInvoke() {
11241129
}
11251130
}
11261131

1132+
@SuppressWarnings("rawtypes")
1133+
private void fixTxOffsetsIfNeeded() {
1134+
if (this.fixTxOffsets) {
1135+
try {
1136+
Map<TopicPartition, OffsetAndMetadata> toFix = new HashMap<>();
1137+
this.lastCommits.forEach((tp, oamd) -> {
1138+
long position = this.consumer.position(tp);
1139+
if (position > oamd.offset()) {
1140+
toFix.put(tp, new OffsetAndMetadata(position));
1141+
}
1142+
});
1143+
if (toFix.size() > 0) {
1144+
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
1145+
if (this.transactionTemplate == null) {
1146+
if (this.syncCommits) {
1147+
commitSync(toFix);
1148+
}
1149+
else {
1150+
commitAsync(toFix, 0);
1151+
}
1152+
}
1153+
else {
1154+
this.transactionTemplate.executeWithoutResult(status -> {
1155+
doSendOffsets(((KafkaResourceHolder) TransactionSynchronizationManager
1156+
.getResource(this.kafkaTxManager.getProducerFactory()))
1157+
.getProducer(), toFix);
1158+
});
1159+
}
1160+
}
1161+
}
1162+
catch (Exception e) {
1163+
this.logger.error(e, "Failed to correct transactional offset(s)");
1164+
}
1165+
}
1166+
}
1167+
11271168
private ConsumerRecords<K, V> doPoll() {
11281169
ConsumerRecords<K, V> records;
11291170
if (this.isBatchListener && this.subBatchPerPartition) {
@@ -1215,8 +1256,7 @@ private void checkIdle() {
12151256
long now = System.currentTimeMillis();
12161257
if (now > this.lastReceive + this.containerProperties.getIdleEventInterval()
12171258
&& now > this.lastAlertAt + this.containerProperties.getIdleEventInterval()) {
1218-
publishIdleContainerEvent(now - this.lastReceive, this.isConsumerAwareListener
1219-
? this.consumer : null, this.consumerPaused);
1259+
publishIdleContainerEvent(now - this.lastReceive, this.consumer, this.consumerPaused);
12201260
this.lastAlertAt = now;
12211261
if (this.consumerSeekAwareListener != null) {
12221262
Collection<TopicPartition> partitions = getAssignedPartitions();
@@ -1392,6 +1432,9 @@ private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int ret
13921432
}
13931433
else {
13941434
this.commitCallback.onComplete(offsetsAttempted, exception);
1435+
if (this.fixTxOffsets) {
1436+
this.lastCommits.putAll(commits);
1437+
}
13951438
}
13961439
});
13971440
}
@@ -2022,6 +2065,9 @@ private void doSendOffsets(Producer<?, ?> prod, Map<TopicPartition, OffsetAndMet
20222065
else {
20232066
prod.sendOffsetsToTransaction(commits, this.consumer.groupMetadata());
20242067
}
2068+
if (this.fixTxOffsets) {
2069+
this.lastCommits.putAll(commits);
2070+
}
20252071
}
20262072

20272073
private void processCommits() {
@@ -2245,6 +2291,9 @@ private void commitSync(Map<TopicPartition, OffsetAndMetadata> commits) {
22452291
private void doCommitSync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
22462292
try {
22472293
this.consumer.commitSync(commits, this.syncCommitTimeout);
2294+
if (this.fixTxOffsets) {
2295+
this.lastCommits.putAll(commits);
2296+
}
22482297
}
22492298
catch (RetriableCommitFailedException e) {
22502299
if (retries >= this.containerProperties.getCommitRetries()) {
@@ -2450,6 +2499,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
24502499
try {
24512500
// Wait until now to commit, in case the user listener added acks
24522501
commitPendingAcks();
2502+
fixTxOffsetsIfNeeded();
24532503
}
24542504
catch (Exception e) {
24552505
ListenerConsumer.this.logger.error(e, () -> "Fatal commit error after revocation "
@@ -2465,6 +2515,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
24652515
if (ListenerConsumer.this.assignedPartitions != null) {
24662516
ListenerConsumer.this.assignedPartitions.removeAll(partitions);
24672517
}
2518+
partitions.forEach(tp -> ListenerConsumer.this.lastCommits.remove(tp));
24682519
}
24692520
finally {
24702521
if (ListenerConsumer.this.kafkaTxManager != null) {

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.HashMap;
4646
import java.util.List;
4747
import java.util.Map;
48+
import java.util.Set;
4849
import java.util.concurrent.CountDownLatch;
4950
import java.util.concurrent.TimeUnit;
5051
import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,6 +84,7 @@
8384
import org.springframework.kafka.core.ProducerFactory;
8485
import org.springframework.kafka.core.ProducerFactoryUtils;
8586
import org.springframework.kafka.event.ConsumerStoppedEvent;
87+
import org.springframework.kafka.event.ListenerContainerIdleEvent;
8688
import org.springframework.kafka.listener.ContainerProperties.AckMode;
8789
import org.springframework.kafka.listener.ContainerProperties.AssignmentCommitOption;
8890
import org.springframework.kafka.listener.ContainerProperties.EOSMode;
@@ -501,7 +503,9 @@ public void testRollbackRecord() throws Exception {
501503
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
502504
ContainerProperties containerProps = new ContainerProperties(topic1, topic2);
503505
containerProps.setGroupId("group");
504-
containerProps.setPollTimeout(10_000);
506+
containerProps.setPollTimeout(500L);
507+
containerProps.setIdleEventInterval(500L);
508+
containerProps.setFixTxOffsets(true);
505509

506510
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
507511
// senderProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@@ -536,6 +540,17 @@ public void testRollbackRecord() throws Exception {
536540
KafkaMessageListenerContainer<Integer, String> container =
537541
new KafkaMessageListenerContainer<>(cf, containerProps);
538542
container.setBeanName("testRollbackRecord");
543+
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
544+
CountDownLatch idleLatch = new CountDownLatch(1);
545+
container.setApplicationEventPublisher(event -> {
546+
if (event instanceof ListenerContainerIdleEvent) {
547+
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
548+
committed.set(consumer.committed(Set.of(new TopicPartition(topic1, 0), new TopicPartition(topic1, 1))));
549+
if (committed.get().get(new TopicPartition(topic1, 0)) != null) {
550+
idleLatch.countDown();
551+
}
552+
}
553+
});
539554
container.start();
540555

541556
template.setDefaultTopic(topic1);
@@ -544,6 +559,10 @@ public void testRollbackRecord() throws Exception {
544559
return null;
545560
});
546561
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
562+
assertThat(idleLatch.await(10, TimeUnit.SECONDS)).isTrue();
563+
TopicPartition partition0 = new TopicPartition(topic1, 0);
564+
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
565+
assertThat(committed.get().get(new TopicPartition(topic1, 1))).isNull();
547566
container.stop();
548567
Consumer<Integer, String> consumer = cf.createConsumer();
549568
final CountDownLatch subsLatch = new CountDownLatch(1);
@@ -567,8 +586,7 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
567586
}
568587
assertThat(subsLatch.await(1, TimeUnit.MILLISECONDS)).isTrue();
569588
assertThat(records.count()).isEqualTo(0);
570-
// depending on timing, the position might include the offset representing the commit in the log
571-
assertThat(consumer.position(new TopicPartition(topic1, 0))).isGreaterThanOrEqualTo(1L);
589+
assertThat(consumer.position(partition0)).isEqualTo(2L);
572590
assertThat(transactionalId.get()).startsWith("rr.group.txTopic");
573591
assertThat(KafkaTestUtils.getPropertyValue(pf, "consumerProducers", Map.class)).isEmpty();
574592
logger.info("Stop testRollbackRecord");

src/reference/asciidoc/kafka.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2293,6 +2293,13 @@ The default executor creates threads named `<name>-C-n`; with the `KafkaMessageL
22932293
|`BETA`
22942294
|Exactly Once Semantics mode; see <<exactly-once>>.
22952295

2296+
|fixTxOffsets
2297+
|`false`
2298+
|When consuming records produced by a transactional producer, and the consumer is positioned at the end of a partition, the lag can incorrectly be reported as greater than zero, due to the pseudo record used to indicate transaction commit/rollback and, possibly, the presence of rolled-back records.
2299+
This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.
2300+
Set this property to `true` and the container will correct such mis-reported offsets.
2301+
The check is performed before the next poll to avoid adding significant complexity to the commit processing.
2302+
22962303
|groupId
22972304
|`null`
22982305
|Overrides the consumer `group.id` property; automatically set by the `@KafkaListener` `id` or `groupId` property.

0 commit comments

Comments
 (0)