Skip to content

Commit 9c252be

Browse files
authored
Fix Kafka Send Timeout
Resolves spring-attic/spring-cloud-stream-binder-kafka#928 Kafka has a longer default send timeout; this means a send could be successful long after Spring has timed out the send. **I will backport to the 3.3.x extension after merge** * Use setSendTimeout; make it final.
1 parent a19e372 commit 9c252be

File tree

6 files changed

+83
-17
lines changed

6 files changed

+83
-17
lines changed

spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.atomic.AtomicBoolean;
3131

3232
import org.apache.kafka.clients.consumer.ConsumerRecord;
33+
import org.apache.kafka.clients.producer.ProducerConfig;
3334
import org.apache.kafka.clients.producer.ProducerRecord;
3435
import org.apache.kafka.common.TopicPartition;
3536
import org.apache.kafka.common.header.Headers;
@@ -101,7 +102,10 @@
101102
public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMessageHandler
102103
implements Lifecycle {
103104

104-
private static final long DEFAULT_SEND_TIMEOUT = 10000;
105+
/**
106+
* Buffer added to ensure our timeout is longer than Kafka's.
107+
*/
108+
private static final int TIMEOUT_BUFFER = 5000;
105109

106110
private final Map<String, Set<Integer>> replyTopicsAndPartitions = new HashMap<>();
107111

@@ -115,6 +119,8 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
115119

116120
private final AtomicBoolean running = new AtomicBoolean();
117121

122+
private final long deliveryTimeoutMsProperty;
123+
118124
private EvaluationContext evaluationContext;
119125

120126
private Expression topicExpression;
@@ -130,7 +136,7 @@ public class KafkaProducerMessageHandler<K, V> extends AbstractReplyProducingMes
130136

131137
private boolean sync;
132138

133-
private Expression sendTimeoutExpression = new ValueExpression<>(DEFAULT_SEND_TIMEOUT);
139+
private Expression sendTimeoutExpression;
134140

135141
private KafkaHeaderMapper headerMapper;
136142

@@ -175,6 +181,25 @@ public KafkaProducerMessageHandler(final KafkaTemplate<K, V> kafkaTemplate) {
175181
logger.warn("The KafkaTemplate is transactional; this gateway will only work if the consumer is "
176182
+ "configured to read uncommitted records");
177183
}
184+
determineSendTimeout();
185+
this.deliveryTimeoutMsProperty = this.sendTimeoutExpression.getValue(Long.class) - TIMEOUT_BUFFER;
186+
}
187+
188+
private void determineSendTimeout() {
189+
Map<String, Object> props = this.kafkaTemplate.getProducerFactory().getConfigurationProperties();
190+
Object dt = props.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
191+
if (dt == null) {
192+
dt = ProducerConfig.configDef().defaultValues().get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
193+
}
194+
if (dt instanceof Long) {
195+
setSendTimeout(((Long) dt) + TIMEOUT_BUFFER);
196+
}
197+
else if (dt instanceof Integer) {
198+
setSendTimeout(Long.valueOf((Integer) dt) + TIMEOUT_BUFFER);
199+
}
200+
else if (dt instanceof String) {
201+
setSendTimeout(Long.parseLong((String) dt) + TIMEOUT_BUFFER);
202+
}
178203
}
179204

180205
public void setTopicExpression(Expression topicExpression) {
@@ -243,24 +268,25 @@ public void setSync(boolean sync) {
243268

244269
/**
245270
* Specify a timeout in milliseconds for how long this
246-
* {@link KafkaProducerMessageHandler} should wait wait for send operation
247-
* results. Defaults to 10 seconds. The timeout is applied only in {@link #sync} mode.
248-
* Also applies when sending to the success or failure channels.
249-
* @param sendTimeout the timeout to wait for result fo send operation.
271+
* {@link KafkaProducerMessageHandler} should wait wait for send operation results.
272+
* Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout
273+
* is applied Also applies when sending to the success or failure channels.
274+
* @param sendTimeout the timeout to wait for result for a send operation.
250275
* @since 2.0.1
251276
*/
252277
@Override
253-
public void setSendTimeout(long sendTimeout) {
278+
public final void setSendTimeout(long sendTimeout) {
254279
super.setSendTimeout(sendTimeout);
255280
setSendTimeoutExpression(new ValueExpression<>(sendTimeout));
256281
}
257282

258283
/**
259284
* Specify a SpEL expression to evaluate a timeout in milliseconds for how long this
260-
* {@link KafkaProducerMessageHandler} should wait wait for send operation
261-
* results. Defaults to 10 seconds. The timeout is applied only in {@link #sync} mode.
285+
* {@link KafkaProducerMessageHandler} should wait wait for send operation results.
286+
* Defaults to the kafka {@code delivery.timeout.ms} property + 5 seconds. The timeout
287+
* is applied only in {@link #sync} mode.
262288
* @param sendTimeoutExpression the {@link Expression} for timeout to wait for result
263-
* fo send operation.
289+
* for a send operation.
264290
* @since 2.1.1
265291
*/
266292
public void setSendTimeoutExpression(Expression sendTimeoutExpression) {
@@ -591,8 +617,15 @@ public void onFailure(Throwable ex) {
591617
});
592618
}
593619

594-
if (this.sync) {
620+
if (this.sync || this.isGateway) {
595621
Long sendTimeout = this.sendTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
622+
if (sendTimeout != null && sendTimeout <= this.deliveryTimeoutMsProperty) {
623+
this.logger.debug("'sendTimeout' increased to "
624+
+ (this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER)
625+
+ "ms; it must be greater than the 'delivery.timeout.ms' Kafka producer "
626+
+ "property to avoid false failures");
627+
sendTimeout = this.deliveryTimeoutMsProperty + TIMEOUT_BUFFER;
628+
}
596629
if (sendTimeout == null || sendTimeout < 0) {
597630
future.get();
598631
}

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests-context.xml

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@
3838

3939
<bean id="ems" class="org.springframework.integration.kafka.config.xml.KafkaOutboundGatewayParserTests$EMS"/>
4040

41-
<bean id="template" class="org.mockito.Mockito" factory-method="mock">
42-
<constructor-arg value="org.springframework.kafka.requestreply.ReplyingKafkaTemplate"/>
43-
</bean>
41+
<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper"/>
4442

45-
<bean id="customHeaderMapper" class="org.springframework.kafka.support.DefaultKafkaHeaderMapper" />
4643
</beans>

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/config/xml/KafkaOutboundGatewayParserTests.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,24 @@
1717
package org.springframework.integration.kafka.config.xml;
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
20+
import static org.mockito.BDDMockito.given;
21+
import static org.mockito.Mockito.mock;
22+
23+
import java.util.HashMap;
24+
import java.util.Map;
2025

2126
import org.junit.jupiter.api.Test;
2227

2328
import org.springframework.beans.factory.annotation.Autowired;
2429
import org.springframework.beans.factory.annotation.Qualifier;
2530
import org.springframework.context.ApplicationContext;
31+
import org.springframework.context.annotation.Bean;
2632
import org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler;
2733
import org.springframework.integration.support.DefaultErrorMessageStrategy;
2834
import org.springframework.integration.test.util.TestUtils;
35+
import org.springframework.kafka.core.KafkaTemplate;
36+
import org.springframework.kafka.core.ProducerFactory;
37+
import org.springframework.stereotype.Component;
2938
import org.springframework.test.annotation.DirtiesContext;
3039
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
3140

@@ -74,8 +83,20 @@ public void testProps() {
7483
.isSameAs(this.context.getBean("customHeaderMapper"));
7584
}
7685

86+
@Component
7787
public static class EMS extends DefaultErrorMessageStrategy {
7888

89+
@SuppressWarnings("rawtypes")
90+
@Bean
91+
public KafkaTemplate template() {
92+
ProducerFactory pf = mock(ProducerFactory.class);
93+
Map<String, Object> props = new HashMap<>();
94+
given(pf.getConfigurationProperties()).willReturn(props);
95+
KafkaTemplate template = mock(KafkaTemplate.class);
96+
given(template.getProducerFactory()).willReturn(pf);
97+
return template;
98+
}
99+
79100
}
80101

81102
}

spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
5555
import org.apache.kafka.clients.producer.Callback;
5656
import org.apache.kafka.clients.producer.Producer;
57+
import org.apache.kafka.clients.producer.ProducerConfig;
5758
import org.apache.kafka.clients.producer.ProducerRecord;
5859
import org.apache.kafka.clients.producer.RecordMetadata;
5960
import org.apache.kafka.common.TopicPartition;
@@ -147,11 +148,14 @@ static void tearDown() {
147148

148149
@Test
149150
void testOutbound() {
150-
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(
151-
KafkaTestUtils.producerProps(embeddedKafka));
151+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
152+
producerProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 40_000);
153+
DefaultKafkaProducerFactory<Integer, String> producerFactory = new DefaultKafkaProducerFactory<>(producerProps);
152154
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(producerFactory);
153155
KafkaProducerMessageHandler<Integer, String> handler = new KafkaProducerMessageHandler<>(template);
154156
handler.setBeanFactory(mock(BeanFactory.class));
157+
handler.setSendTimeout(50_000);
158+
handler.setSync(true);
155159
handler.afterPropertiesSet();
156160

157161
Message<?> message = MessageBuilder.withPayload("foo")

src/reference/asciidoc/kafka.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,10 @@ Flushing after sending several messages might be useful if you are using the `li
7373
By default, the expression looks for a `Boolean` value in the `KafkaIntegrationHeaders.FLUSH` header (`kafka_flush`).
7474
The flush will occur if the value is `true` and not if it's `false` or the header is absent.
7575

76+
Starting with version 5.4, the `KafkaProducerMessageHandler` `sendTimeoutExpression` default has changed from 10 seconds to the `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
77+
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
78+
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
79+
7680
==== Java Configuration
7781

7882
The following example shows how to configure the Kafka outbound channel adapter with Java:
@@ -425,6 +429,10 @@ If your code invokes the gateway behind a synchronous https://docs.spring.io/spr
425429
IMPORTANT: The gateway does not accept requests until the reply container has been assigned its topics and partitions.
426430
It is suggested that you add a `ConsumerRebalanceListener` to the template's reply container properties and wait for the `onPartitionsAssigned` call before sending messages to the gateway.
427431

432+
Starting with version 5.4, the `KafkaProducerMessageHandler` `sendTimeoutExpression` default has changed from 10 seconds to the `delivery.timeout.ms` Kafka producer property `+ 5000` so that the actual Kafka error after a timeout is propagated to the application, instead of a timeout generated by this framework.
433+
This has been changed for consistency because you may get unexpected behavior (Spring may timeout the send, while it is actually, eventually, successful).
434+
IMPORTANT: That timeout is 120 seconds by default so you may wish to reduce it to get more timely failures.
435+
428436
==== Java Configuration
429437

430438
The following example shows how to configure a gateway with Java:

src/reference/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ If you are interested in more details, see the Issue Tracker tickets that were r
2020
The standalone https://projects.spring.io/spring-integration-kafka/[Spring Integration Kafka] project has been merged as a `spring-integration-kafka` module to this project.
2121
See <<./kafka.adoc#kafka,Spring for Apache Kafka Support>> for more information.
2222

23+
The `KafkaProducerMessageHandler` `sendTimeoutExpression` default has changed.
24+
See <<./kafka.adoc#kafka-outbound,Kafka Outbound Channel Adapter>> for more information.
25+
2326
==== R2DBC Channel Adapters
2427

2528
The Channel Adapters for R2DBC database interaction have been introduced.

0 commit comments

Comments
 (0)