Skip to content

Commit 9722b84

Browse files
garyrussellartembilan
authored andcommitted
GH-2076: Fix Async Commit Retries
#2076 Do not attempt to retry asynchronous commits. - a later commit for the same topic/partition may have already succeeded - many consecutive retryable exceptions can cause a stack overflow **cherry-pick to 2.8.x, 2.7.x** * Remove unused parameter; polish javadocs. # Conflicts: # spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java
1 parent 4637d2f commit 9722b84

File tree

3 files changed

+23
-48
lines changed

3 files changed

+23
-48
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2021 the original author or authors.
2+
* Copyright 2019-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -293,6 +293,7 @@ public OffsetCommitCallback getCommitCallback() {
293293
* @see #setSyncCommitTimeout(Duration)
294294
* @see #setCommitCallback(OffsetCommitCallback)
295295
* @see #setCommitLogLevel(org.springframework.kafka.support.LogIfLevelEnabled.Level)
296+
* @see #setCommitRetries(int)
296297
*/
297298
public void setSyncCommits(boolean syncCommits) {
298299
this.syncCommits = syncCommits;
@@ -408,9 +409,10 @@ public void setAuthExceptionRetryInterval(Duration authExceptionRetryInterval) {
408409
/**
409410
* The number of retries allowed when a
410411
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
411-
* by the consumer.
412+
* by the consumer when using {@link #setSyncCommits(boolean)} set to true.
412413
* @return the number of retries.
413414
* @since 2.3.9
415+
* @see #setSyncCommits(boolean)
414416
*/
415417
public int getCommitRetries() {
416418
return this.commitRetries;
@@ -419,9 +421,11 @@ public int getCommitRetries() {
419421
/**
420422
* Set number of retries allowed when a
421423
* {@link org.apache.kafka.clients.consumer.RetriableCommitFailedException} is thrown
422-
* by the consumer. Default 3 (4 attempts total).
424+
* by the consumer when using {@link #setSyncCommits(boolean)} set to true. Default 3
425+
* (4 attempts total).
423426
* @param commitRetries the commitRetries.
424427
* @since 2.3.9
428+
* @see #setSyncCommits(boolean)
425429
*/
426430
public void setCommitRetries(int commitRetries) {
427431
this.commitRetries = commitRetries;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1452,7 +1452,7 @@ private void fixTxOffsetsIfNeeded() {
14521452
commitSync(toFix);
14531453
}
14541454
else {
1455-
commitAsync(toFix, 0);
1455+
commitAsync(toFix);
14561456
}
14571457
}
14581458
else {
@@ -1912,7 +1912,7 @@ else if (this.syncCommits) {
19121912
commitSync(commits);
19131913
}
19141914
else {
1915-
commitAsync(commits, 0);
1915+
commitAsync(commits);
19161916
}
19171917
}
19181918

@@ -1931,18 +1931,14 @@ else if (this.syncCommits) {
19311931
commitSync(commits);
19321932
}
19331933
else {
1934-
commitAsync(commits, 0);
1934+
commitAsync(commits);
19351935
}
19361936
}
19371937

1938-
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits, int retries) {
1938+
private void commitAsync(Map<TopicPartition, OffsetAndMetadata> commits) {
19391939
this.consumer.commitAsync(commits, (offsetsAttempted, exception) -> {
1940-
if (exception instanceof RetriableCommitFailedException
1941-
&& retries < this.containerProperties.getCommitRetries()) {
1942-
commitAsync(commits, retries + 1);
1943-
}
1944-
else {
1945-
this.commitCallback.onComplete(offsetsAttempted, exception);
1940+
this.commitCallback.onComplete(offsetsAttempted, exception);
1941+
if (exception == null) {
19461942
if (this.fixTxOffsets) {
19471943
this.lastCommits.putAll(commits);
19481944
}
@@ -2701,7 +2697,7 @@ public void ackCurrent(final ConsumerRecord<K, V> record) {
27012697
commitSync(offsetsToCommit);
27022698
}
27032699
else {
2704-
commitAsync(offsetsToCommit, 0);
2700+
commitAsync(offsetsToCommit);
27052701
}
27062702
}
27072703
else {
@@ -2963,7 +2959,7 @@ private void commitIfNecessary() {
29632959
commitSync(commits);
29642960
}
29652961
else {
2966-
commitAsync(commits, 0);
2962+
commitAsync(commits);
29672963
}
29682964
}
29692965
catch (@SuppressWarnings(UNUSED) WakeupException e) {
@@ -3360,7 +3356,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
33603356
}
33613357
}
33623358
else {
3363-
commitAsync(offsetsToCommit, 0);
3359+
commitAsync(offsetsToCommit);
33643360
}
33653361
}
33663362
}

spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java

Lines changed: 7 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@
7070
import org.apache.kafka.clients.consumer.KafkaConsumer;
7171
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
7272
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
73-
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
7473
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
7574
import org.apache.kafka.clients.producer.ProducerConfig;
7675
import org.apache.kafka.common.TopicPartition;
@@ -3380,17 +3379,8 @@ public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
33803379
}
33813380

33823381
@Test
3383-
void testCommitSyncRetries() throws Exception {
3384-
testCommitRetriesGuts(true);
3385-
}
3386-
3387-
@Test
3388-
void testCommitAsyncRetries() throws Exception {
3389-
testCommitRetriesGuts(false);
3390-
}
3391-
33923382
@SuppressWarnings({ "unchecked", "rawtypes" })
3393-
private void testCommitRetriesGuts(boolean sync) throws Exception {
3383+
void testCommitSyncRetries() throws Exception {
33943384
ConsumerFactory<Integer, String> cf = mock(ConsumerFactory.class);
33953385
Consumer<Integer, String> consumer = mock(Consumer.class);
33963386
given(cf.createConsumer(eq("grp"), eq("clientId"), isNull(), any())).willReturn(consumer);
@@ -3409,24 +3399,14 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
34093399
return first.getAndSet(false) ? consumerRecords : emptyRecords;
34103400
});
34113401
CountDownLatch latch = new CountDownLatch(4);
3412-
if (sync) {
3413-
willAnswer(i -> {
3414-
latch.countDown();
3415-
throw new RetriableCommitFailedException("");
3416-
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
3417-
}
3418-
else {
3419-
willAnswer(i -> {
3420-
OffsetCommitCallback callback = i.getArgument(1);
3421-
callback.onComplete(i.getArgument(0), new RetriableCommitFailedException(""));
3422-
latch.countDown();
3423-
return null;
3424-
}).given(consumer).commitAsync(anyMap(), any());
3425-
}
34263402
TopicPartitionOffset[] topicPartition = new TopicPartitionOffset[] {
34273403
new TopicPartitionOffset("foo", 0) };
34283404
ContainerProperties containerProps = new ContainerProperties(topicPartition);
3429-
containerProps.setSyncCommits(sync);
3405+
willAnswer(i -> {
3406+
latch.countDown();
3407+
throw new RetriableCommitFailedException("");
3408+
}).given(consumer).commitSync(anyMap(), eq(Duration.ofSeconds(45)));
3409+
containerProps.setSyncCommits(true);
34303410
containerProps.setGroupId("grp");
34313411
containerProps.setClientId("clientId");
34323412
containerProps.setIdleEventInterval(100L);
@@ -3438,12 +3418,7 @@ private void testCommitRetriesGuts(boolean sync) throws Exception {
34383418
container.start();
34393419
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
34403420
container.stop();
3441-
if (sync) {
3442-
verify(consumer, times(4)).commitSync(any(), any());
3443-
}
3444-
else {
3445-
verify(consumer, times(4)).commitAsync(any(), any());
3446-
}
3421+
verify(consumer, times(4)).commitSync(any(), any());
34473422
}
34483423

34493424
@Test

0 commit comments

Comments
 (0)