Skip to content

Commit 620c1a6

Browse files
garyrussellartembilan
authored andcommitted
GH-1587: Fix NPE with Foreign TM and fixTxOffsets
Resolves #1587 Code was using the presence of a transaction template instead of the KTM. **cherry-pick to 2.5.x**
1 parent 9bb4334 commit 620c1a6

File tree

4 files changed

+42
-18
lines changed

4 files changed

+42
-18
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -410,9 +410,10 @@ public boolean isFixTxOffsets() {
410410
* functionally affect the consumer but some users have expressed concern that the
411411
* "lag" is non-zero. Set this to true and the container will correct such
412412
* mis-reported offsets. The check is performed before the next poll to avoid adding
413-
* significant complexity to the commit processing. IMPORTANT: The lag will only be
414-
* corrected if the consumer is configured with
415-
* {@code isolation.level=read_committed}.
413+
* significant complexity to the commit processing. IMPORTANT: At the time of writing,
414+
* the lag will only be corrected if the consumer is configured with
415+
* {@code isolation.level=read_committed} and {@code max.poll.records} is greater than
416+
* 1. See https://issues.apache.org/jira/browse/KAFKA-10683 for more information.
416417
* @param fixTxOffsets true to correct the offset(s).
417418
* @since 2.5.6
418419
*/

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1186,7 +1186,7 @@ private void fixTxOffsetsIfNeeded() {
11861186
});
11871187
if (toFix.size() > 0) {
11881188
this.logger.debug(() -> "Fixing TX offsets: " + toFix);
1189-
if (this.transactionTemplate == null) {
1189+
if (this.kafkaTxManager == null) {
11901190
if (this.syncCommits) {
11911191
commitSync(toFix);
11921192
}

spring-kafka/src/test/java/org/springframework/kafka/listener/TransactionalContainerTests.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@
118118
*/
119119
@EmbeddedKafka(topics = { TransactionalContainerTests.topic1, TransactionalContainerTests.topic2,
120120
TransactionalContainerTests.topic3, TransactionalContainerTests.topic3DLT, TransactionalContainerTests.topic4,
121-
TransactionalContainerTests.topic5 },
121+
TransactionalContainerTests.topic5, TransactionalContainerTests.topic6, TransactionalContainerTests.topic7 },
122122
brokerProperties = { "transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1" })
123123
public class TransactionalContainerTests {
124124

@@ -136,6 +136,10 @@ public class TransactionalContainerTests {
136136

137137
public static final String topic5 = "txTopic5";
138138

139+
public static final String topic6 = "txTopic6";
140+
141+
public static final String topic7 = "txTopic7";
142+
139143
private static EmbeddedKafkaBroker embeddedKafka;
140144

141145
@BeforeAll
@@ -597,55 +601,73 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
597601
consumer.close();
598602
}
599603

600-
@SuppressWarnings("unchecked")
601604
@Test
602-
public void testFixLag() throws Exception {
605+
public void testFixLag() throws InterruptedException {
606+
testFixLagGuts(topic5, 0);
607+
}
608+
609+
@Test
610+
public void testFixLagKTM() throws InterruptedException {
611+
testFixLagGuts(topic6, 1);
612+
}
613+
614+
@Test
615+
public void testFixLagOtherTM() throws InterruptedException {
616+
testFixLagGuts(topic7, 2);
617+
}
618+
619+
@SuppressWarnings("unchecked")
620+
private void testFixLagGuts(String topic, int whichTm) throws InterruptedException {
603621
logger.info("Start testFixLag");
604622
Map<String, Object> props = KafkaTestUtils.consumerProps("txTest2", "false", embeddedKafka);
605623
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
606624
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(props);
607-
ContainerProperties containerProps = new ContainerProperties(topic5);
625+
ContainerProperties containerProps = new ContainerProperties(topic);
608626
containerProps.setGroupId("txTest2");
609627
containerProps.setPollTimeout(500L);
610628
containerProps.setIdleEventInterval(500L);
611629
containerProps.setFixTxOffsets(true);
612-
613630
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
614631
senderProps.put(ProducerConfig.RETRIES_CONFIG, 1);
615632
DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
616633
pf.setTransactionIdPrefix("fl.");
634+
switch (whichTm) {
635+
case 0:
636+
break;
637+
case 1:
638+
containerProps.setTransactionManager(new KafkaTransactionManager<>(pf));
639+
break;
640+
case 2:
641+
containerProps.setTransactionManager(new SomeOtherTransactionManager());
642+
}
617643

618644
final KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
619645
final CountDownLatch latch = new CountDownLatch(1);
620-
final AtomicReference<String> transactionalId = new AtomicReference<>();
621646
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
622647
});
623648

624-
@SuppressWarnings({ "rawtypes" })
625-
KafkaTransactionManager tm = new KafkaTransactionManager(pf);
626-
containerProps.setTransactionManager(tm);
627649
KafkaMessageListenerContainer<Integer, String> container =
628650
new KafkaMessageListenerContainer<>(cf, containerProps);
629651
container.setBeanName("testRollbackRecord");
630652
AtomicReference<Map<TopicPartition, OffsetAndMetadata>> committed = new AtomicReference<>();
631653
container.setApplicationEventPublisher(event -> {
632654
if (event instanceof ListenerContainerIdleEvent) {
633655
Consumer<?, ?> consumer = ((ListenerContainerIdleEvent) event).getConsumer();
634-
committed.set(consumer.committed(Set.of(new TopicPartition(topic5, 0))));
635-
if (committed.get().get(new TopicPartition(topic5, 0)) != null) {
656+
committed.set(consumer.committed(Set.of(new TopicPartition(topic, 0))));
657+
if (committed.get().get(new TopicPartition(topic, 0)) != null) {
636658
latch.countDown();
637659
}
638660
}
639661
});
640662
container.start();
641663

642-
template.setDefaultTopic(topic5);
664+
template.setDefaultTopic(topic);
643665
template.executeInTransaction(t -> {
644666
template.sendDefault(0, 0, "foo");
645667
return null;
646668
});
647669
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
648-
TopicPartition partition0 = new TopicPartition(topic5, 0);
670+
TopicPartition partition0 = new TopicPartition(topic, 0);
649671
assertThat(committed.get().get(partition0).offset()).isEqualTo(2L);
650672
container.stop();
651673
pf.destroy();

src/reference/asciidoc/kafka.adoc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2300,7 +2300,8 @@ The default executor creates threads named `<name>-C-n`; with the `KafkaMessageL
23002300
This does not functionally affect the consumer but some users have expressed concern that the "lag" is non-zero.
23012301
Set this property to `true` and the container will correct such mis-reported offsets.
23022302
The check is performed before the next poll to avoid adding significant complexity to the commit processing.
2303-
The lag will only be corrected if the consumer is configured with `isolation.level=read_committed`.
2303+
At the time of writing, the lag will only be corrected if the consumer is configured with `isolation.level=read_committed` and `max.poll.records` is greater than 1.
2304+
See https://issues.apache.org/jira/browse/KAFKA-10683[KAFKA-10683] for more information.
23042305

23052306
|groupId
23062307
|`null`

0 commit comments

Comments
 (0)