Skip to content

Commit fa983fd

Browse files
committed
Fix idleBetweenPolls max value calculation
- Wrong default used when `max.poll.interval.ms` is not specified - 30s Vs 300s - Value set in `@KafkaListener.properties` ignored for this calculatio and default used instead - Also use actual default for `ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG` **I will back port - expecting conflicts** * Remove unused constant
1 parent 90c87c0 commit fa983fd

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ public class KafkaMessageListenerContainer<K, V> // NOSONAR line count
140140

141141
private static final int DEFAULT_ACK_TIME = 5000;
142142

143+
private static final Map<String, Object> CONSUMER_CONFIG_DEFAULTS = ConsumerConfig.configDef().defaultValues();
144+
143145
private final AbstractMessageListenerContainer<K, V> thisOrParentContainer;
144146

145147
private final TopicPartitionOffset[] topicPartitions;
@@ -433,8 +435,6 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
433435

434436
private static final String ERROR_HANDLER_THREW_AN_EXCEPTION = "Error handler threw an exception";
435437

436-
private static final int SIXTY = 60;
437-
438438
private static final String UNCHECKED = "unchecked";
439439

440440
private static final String RAWTYPES = "rawtypes";
@@ -592,7 +592,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
592592

593593
@SuppressWarnings(UNCHECKED)
594594
ListenerConsumer(GenericMessageListener<?> listener, ListenerType listenerType) {
595-
Properties consumerProperties = new Properties(this.containerProperties.getKafkaConsumerProperties());
595+
Properties consumerProperties = propertiesFromProperties();
596596
checkGroupInstance(consumerProperties, KafkaMessageListenerContainer.this.consumerFactory);
597597
this.autoCommit = determineAutoCommit(consumerProperties);
598598
this.consumer =
@@ -676,6 +676,20 @@ else if (listener instanceof MessageListener) {
676676
this.subBatchPerPartition = setupSubBatchPerPartition();
677677
}
678678

679+
private Properties propertiesFromProperties() {
680+
Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
681+
Properties props = new Properties();
682+
props.putAll(propertyOverrides);
683+
Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
684+
// User might have provided properties as defaults
685+
stringPropertyNames.forEach((name) -> {
686+
if (!props.contains(name)) {
687+
props.setProperty(name, propertyOverrides.getProperty(name));
688+
}
689+
});
690+
return props;
691+
}
692+
679693
String getClientId() {
680694
return this.clientId;
681695
}
@@ -770,9 +784,9 @@ else if (timeout instanceof String) {
770784
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
771785
+ " in property '"
772786
+ ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG
773-
+ "'; defaulting to 30 seconds.");
787+
+ "'; using Kafka default.");
774788
}
775-
return Duration.ofSeconds(SIXTY / 2).toMillis(); // Default 'max.poll.interval.ms' is 30 seconds
789+
return (int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG);
776790
}
777791
}
778792

@@ -855,12 +869,12 @@ else if (timeout instanceof String) {
855869
this.logger.warn(() -> "Unexpected type: " + timeoutToLog.getClass().getName()
856870
+ " in property '"
857871
+ ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG
858-
+ "'; defaulting to 60 seconds for sync commit timeouts");
872+
+ "'; defaulting to Kafka default for sync commit timeouts");
859873
}
860-
return Duration.ofSeconds(SIXTY);
874+
return Duration
875+
.ofMillis((int) CONSUMER_CONFIG_DEFAULTS.get(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
861876
}
862877
}
863-
864878
}
865879

866880
private Object findDeserializerClass(Map<String, Object> props, boolean isValue) {

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@
9898
import org.springframework.kafka.listener.ContainerProperties;
9999
import org.springframework.kafka.listener.ContainerProperties.AckMode;
100100
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
101+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
101102
import org.springframework.kafka.listener.ListenerExecutionFailedException;
102103
import org.springframework.kafka.listener.MessageListenerContainer;
103104
import org.springframework.kafka.listener.adapter.FilteringMessageListenerAdapter;
@@ -235,6 +236,10 @@ public void testAnonymous() {
235236
List<?> containers = KafkaTestUtils.getPropertyValue(container, "containers", List.class);
236237
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.consumerGroupId"))
237238
.isEqualTo(DEFAULT_TEST_GROUP_ID);
239+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.maxPollInterval"))
240+
.isEqualTo(300000L);
241+
assertThat(KafkaTestUtils.getPropertyValue(containers.get(0), "listenerConsumer.syncCommitTimeout"))
242+
.isEqualTo(Duration.ofSeconds(60));
238243
container.stop();
239244
}
240245

@@ -370,20 +375,24 @@ public void testAutoStartup() {
370375
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout()).isNull();
371376
this.registry.start();
372377
assertThat(listenerContainer.isRunning()).isTrue();
373-
assertThat(((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
378+
KafkaMessageListenerContainer<?, ?> kafkaMessageListenerContainer =
379+
((ConcurrentMessageListenerContainer<?, ?>) listenerContainer)
374380
.getContainers()
375-
.get(0)
381+
.get(0);
382+
assertThat(kafkaMessageListenerContainer
376383
.getContainerProperties().getSyncCommitTimeout())
377-
.isEqualTo(Duration.ofSeconds(60));
384+
.isEqualTo(Duration.ofSeconds(59));
378385
assertThat(listenerContainer.getContainerProperties().getSyncCommitTimeout())
379-
.isEqualTo(Duration.ofSeconds(60));
386+
.isEqualTo(Duration.ofSeconds(59));
380387
listenerContainer.stop();
381388
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.syncCommits", Boolean.class))
382389
.isFalse();
383390
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.commitCallback"))
384391
.isNotNull();
385392
assertThat(KafkaTestUtils.getPropertyValue(listenerContainer, "containerProperties.consumerRebalanceListener"))
386393
.isNotNull();
394+
assertThat(KafkaTestUtils.getPropertyValue(kafkaMessageListenerContainer, "listenerConsumer.maxPollInterval"))
395+
.isEqualTo(301000L);
387396
}
388397

389398
@Test
@@ -1683,7 +1692,9 @@ static class Listener implements ConsumerSeekAware {
16831692
volatile CustomMethodArgument customMethodArgument;
16841693

16851694
@KafkaListener(id = "manualStart", topics = "manualStart",
1686-
containerFactory = "kafkaAutoStartFalseListenerContainerFactory")
1695+
containerFactory = "kafkaAutoStartFalseListenerContainerFactory",
1696+
properties = { ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG + ":301000",
1697+
ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG + ":59000" })
16871698
public void manualStart(String foo) {
16881699
}
16891700

0 commit comments

Comments
 (0)