Skip to content

Commit 0a0cd35

Browse files
garyrussellartembilan
authored andcommitted
GH-748: Handle Kotlin/Lombok Message Listeners
See #748 (comment) (issue not resolved by this) Kotlin and Lombok (`@SneakyThrows`) can throw checked exceptions even though the method signatures do not allow it. Such exceptions were not passed to the error handlers until the failed record is no longer in scope. They were called as if the consumer itself threw an exception. Always wrap listener exceptions in `ListenerExecutionFailedException`s. Also remove Java9 constructs in `@Deprecation`s. **cherry-pick to 2.6.x, 2.5.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java
1 parent d47d178 commit 0a0cd35

File tree

4 files changed

+169
-64
lines changed

4 files changed

+169
-64
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,17 @@ public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>>
121121
return this.sender.send(records);
122122
}
123123

124-
public Mono<Void> flush() {
124+
/**
125+
* Flush the producer.
126+
* @return {@link Mono#empty()}.
127+
* @deprecated - flush does not make sense in the context of a reactive flow since,
128+
* the send completion signal is a send result, which implies that a flush is
129+
* redundant. If you use this method with reactor-kafka 1.3 or later, it must be
130+
* scheduled to avoid a deadlock; see
131+
* https://issues.apache.org/jira/browse/KAFKA-10790 (since 2.7).
132+
*/
133+
@Deprecated
134+
public Mono<?> flush() {
125135
return doOnProducer(producer -> {
126136
producer.flush();
127137
return null;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,10 +1583,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
15831583
}
15841584

15851585
private void batchAfterRollback(final ConsumerRecords<K, V> records,
1586-
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
1586+
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
15871587
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
15881588

1589-
RuntimeException rollbackException = decorateException(e);
15901589
try {
15911590
if (recordList == null) {
15921591
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer, rollbackException,
@@ -1748,32 +1747,37 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
17481747
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
17491748
List<ConsumerRecord<K, V>> recordList) {
17501749

1751-
switch (this.listenerType) {
1752-
case ACKNOWLEDGING_CONSUMER_AWARE:
1753-
this.batchListener.onMessage(recordList,
1754-
this.isAnyManualAck
1755-
? new ConsumerBatchAcknowledgment(records)
1756-
: null, this.consumer);
1757-
break;
1758-
case ACKNOWLEDGING:
1759-
this.batchListener.onMessage(recordList,
1760-
this.isAnyManualAck
1761-
? new ConsumerBatchAcknowledgment(records)
1762-
: null);
1763-
break;
1764-
case CONSUMER_AWARE:
1765-
this.batchListener.onMessage(recordList, this.consumer);
1766-
break;
1767-
case SIMPLE:
1768-
this.batchListener.onMessage(recordList);
1769-
break;
1750+
try {
1751+
switch (this.listenerType) {
1752+
case ACKNOWLEDGING_CONSUMER_AWARE:
1753+
this.batchListener.onMessage(recordList,
1754+
this.isAnyManualAck
1755+
? new ConsumerBatchAcknowledgment(records)
1756+
: null, this.consumer);
1757+
break;
1758+
case ACKNOWLEDGING:
1759+
this.batchListener.onMessage(recordList,
1760+
this.isAnyManualAck
1761+
? new ConsumerBatchAcknowledgment(records)
1762+
: null);
1763+
break;
1764+
case CONSUMER_AWARE:
1765+
this.batchListener.onMessage(recordList, this.consumer);
1766+
break;
1767+
case SIMPLE:
1768+
this.batchListener.onMessage(recordList);
1769+
break;
1770+
}
1771+
}
1772+
catch (Exception ex) { // NOSONAR
1773+
throw decorateException(ex);
17701774
}
17711775
}
17721776

17731777
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
1774-
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException e) {
1778+
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException rte) {
17751779

1776-
this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
1780+
this.batchErrorHandler.handle(rte, records, this.consumer,
17771781
KafkaMessageListenerContainer.this.thisOrParentContainer,
17781782
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
17791783
}
@@ -1832,9 +1836,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
18321836
}
18331837
break;
18341838
}
1835-
catch (RuntimeException e) {
1836-
this.logger.error(e, "Transaction rolled back");
1837-
recordAfterRollback(iterator, record, decorateException(e));
1839+
catch (RuntimeException ex) {
1840+
this.logger.error(ex, "Transaction rolled back");
1841+
recordAfterRollback(iterator, record, ex);
18381842
}
18391843
finally {
18401844
if (this.producerPerConsumerPartition) {
@@ -2034,31 +2038,36 @@ record = this.recordInterceptor.intercept(record);
20342038
+ ListenerUtils.recordToString(recordArg));
20352039
}
20362040
else {
2037-
switch (this.listenerType) {
2038-
case ACKNOWLEDGING_CONSUMER_AWARE:
2039-
this.listener.onMessage(record,
2040-
this.isAnyManualAck
2041-
? new ConsumerAcknowledgment(record)
2042-
: null, this.consumer);
2043-
break;
2044-
case CONSUMER_AWARE:
2045-
this.listener.onMessage(record, this.consumer);
2046-
break;
2047-
case ACKNOWLEDGING:
2048-
this.listener.onMessage(record,
2049-
this.isAnyManualAck
2050-
? new ConsumerAcknowledgment(record)
2051-
: null);
2052-
break;
2053-
case SIMPLE:
2054-
this.listener.onMessage(record);
2055-
break;
2041+
try {
2042+
switch (this.listenerType) {
2043+
case ACKNOWLEDGING_CONSUMER_AWARE:
2044+
this.listener.onMessage(record,
2045+
this.isAnyManualAck
2046+
? new ConsumerAcknowledgment(record)
2047+
: null, this.consumer);
2048+
break;
2049+
case CONSUMER_AWARE:
2050+
this.listener.onMessage(record, this.consumer);
2051+
break;
2052+
case ACKNOWLEDGING:
2053+
this.listener.onMessage(record,
2054+
this.isAnyManualAck
2055+
? new ConsumerAcknowledgment(record)
2056+
: null);
2057+
break;
2058+
case SIMPLE:
2059+
this.listener.onMessage(record);
2060+
break;
2061+
}
2062+
}
2063+
catch (Exception ex) { // NOSONAR
2064+
throw decorateException(ex);
20562065
}
20572066
}
20582067
}
20592068

20602069
private void invokeErrorHandler(final ConsumerRecord<K, V> record,
2061-
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
2070+
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
20622071

20632072
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
20642073
if (this.producer == null) {
@@ -2069,30 +2078,33 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
20692078
while (iterator.hasNext()) {
20702079
records.add(iterator.next());
20712080
}
2072-
this.errorHandler.handle(decorateException(e), records, this.consumer,
2081+
this.errorHandler.handle(rte, records, this.consumer,
20732082
KafkaMessageListenerContainer.this.thisOrParentContainer);
20742083
}
20752084
else {
2076-
this.errorHandler.handle(decorateException(e), record, this.consumer);
2085+
this.errorHandler.handle(rte, record, this.consumer);
20772086
}
20782087
}
20792088

2080-
private RuntimeException decorateException(RuntimeException e) {
2081-
RuntimeException toHandle = e;
2089+
private RuntimeException decorateException(Exception ex) {
2090+
Exception toHandle = ex;
20822091
if (toHandle instanceof ListenerExecutionFailedException) {
20832092
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
20842093
toHandle.getCause());
20852094
}
20862095
else {
20872096
toHandle = new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, toHandle);
20882097
}
2089-
return toHandle;
2098+
return (RuntimeException) toHandle;
20902099
}
20912100

20922101
public void checkDeser(final ConsumerRecord<K, V> record, String headerName) {
20932102
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger);
20942103
if (exception != null) {
2095-
throw exception;
2104+
/*
2105+
* Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.
2106+
*/
2107+
throw decorateException(exception);
20962108
}
20972109
}
20982110

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,10 @@ public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
153153
* @param lastIntervals a thread local containing the previous {@link BackOff}
154154
* interval for this thread.
155155
* @since 2.3.12
156+
* @deprecated since 2.7 in favor of
157+
* {@link #unrecoverableBackOff(BackOff, ThreadLocal, ThreadLocal, MessageListenerContainer)}.
156158
*/
159+
@Deprecated
157160
public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExecution> executions,
158161
ThreadLocal<Long> lastIntervals) {
159162

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,10 +33,16 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3333
import org.springframework.kafka.core.DefaultKafkaProducerFactory
3434
import org.springframework.kafka.core.KafkaTemplate
3535
import org.springframework.kafka.core.ProducerFactory
36+
import org.springframework.kafka.listener.BatchErrorHandler
37+
import org.springframework.kafka.listener.BatchMessageListener
38+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
39+
import org.springframework.kafka.listener.ErrorHandler
40+
import org.springframework.kafka.listener.MessageListener
3641
import org.springframework.kafka.test.EmbeddedKafkaBroker
3742
import org.springframework.kafka.test.context.EmbeddedKafka
3843
import org.springframework.test.annotation.DirtiesContext
3944
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
45+
import java.lang.Exception
4046
import java.util.concurrent.CountDownLatch
4147
import java.util.concurrent.TimeUnit
4248

@@ -48,7 +54,7 @@ import java.util.concurrent.TimeUnit
4854

4955
@SpringJUnitConfig
5056
@DirtiesContext
51-
@EmbeddedKafka(topics = ["kotlinTestTopic"])
57+
@EmbeddedKafka(topics = ["kotlinTestTopic1", "kotlinBatchTestTopic1", "kotlinTestTopic2", "kotlinBatchTestTopic2"])
5258
class EnableKafkaKotlinTests {
5359

5460
@Autowired
@@ -59,26 +65,55 @@ class EnableKafkaKotlinTests {
5965

6066
@Test
6167
fun `test listener`() {
62-
this.template.send("kotlinTestTopic", "foo")
63-
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
68+
this.template.send("kotlinTestTopic1", "foo")
69+
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
6470
assertThat(this.config.received).isEqualTo("foo")
6571
}
6672

73+
@Test
74+
fun `test checkedEx`() {
75+
this.template.send("kotlinTestTopic2", "fail")
76+
assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue()
77+
assertThat(this.config.error).isTrue()
78+
}
79+
6780
@Test
6881
fun `test batch listener`() {
69-
this.template.send("kotlinTestTopic", "foo")
70-
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
82+
this.template.send("kotlinBatchTestTopic1", "foo")
83+
assertThat(this.config.batchLatch1.await(10, TimeUnit.SECONDS)).isTrue()
7184
assertThat(this.config.batchReceived).isEqualTo("foo")
7285
}
7386

87+
@Test
88+
fun `test batch checkedEx`() {
89+
this.template.send("kotlinBatchTestTopic2", "fail")
90+
assertThat(this.config.batchLatch2.await(10, TimeUnit.SECONDS)).isTrue()
91+
assertThat(this.config.batchError).isTrue()
92+
}
93+
7494
@Configuration
7595
@EnableKafka
7696
class Config {
7797

98+
@Volatile
7899
lateinit var received: String
100+
101+
@Volatile
79102
lateinit var batchReceived: String
80103

81-
val latch = CountDownLatch(2)
104+
@Volatile
105+
var error: Boolean = false
106+
107+
@Volatile
108+
var batchError: Boolean = false
109+
110+
val latch1 = CountDownLatch(1)
111+
112+
val latch2 = CountDownLatch(1)
113+
114+
val batchLatch1 = CountDownLatch(1)
115+
116+
val batchLatch2 = CountDownLatch(1)
82117

83118
@Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
84119
private lateinit var brokerAddresses: String
@@ -108,33 +143,78 @@ class EnableKafkaKotlinTests {
108143
return KafkaTemplate(kpf())
109144
}
110145

146+
val eh = ErrorHandler { _, recs : ConsumerRecord<*, *>? ->
147+
if (recs != null) {
148+
this.error = true;
149+
this.latch2.countDown()
150+
}
151+
}
152+
111153
@Bean
112154
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
113155
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
114156
= ConcurrentKafkaListenerContainerFactory()
115157
factory.consumerFactory = kcf()
158+
factory.setErrorHandler(eh)
116159
return factory
117160
}
118161

162+
val beh = BatchErrorHandler { _, recs ->
163+
if (!recs.isEmpty) {
164+
this.batchError = true;
165+
this.batchLatch2.countDown()
166+
}
167+
}
168+
119169
@Bean
120170
fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
121171
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
122172
= ConcurrentKafkaListenerContainerFactory()
123173
factory.isBatchListener = true
124174
factory.consumerFactory = kcf()
175+
factory.setBatchErrorHandler(beh)
125176
return factory
126177
}
127178

128-
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic"], containerFactory = "kafkaListenerContainerFactory")
179+
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory")
129180
fun listen(value: String) {
130181
this.received = value
131-
this.latch.countDown()
182+
this.latch1.countDown()
132183
}
133184

134-
@KafkaListener(id = "kotlin-batch", topics = ["kotlinTestTopic"], containerFactory = "kafkaBatchListenerContainerFactory")
185+
@KafkaListener(id = "kotlin-batch", topics = ["kotlinBatchTestTopic1"], containerFactory = "kafkaBatchListenerContainerFactory")
135186
fun batchListen(values: List<ConsumerRecord<String, String>>) {
136187
this.batchReceived = values.first().value()
137-
this.latch.countDown()
188+
this.batchLatch1.countDown()
189+
}
190+
191+
@Bean
192+
fun checkedEx(kafkaListenerContainerFactory : ConcurrentKafkaListenerContainerFactory<String, String>) :
193+
ConcurrentMessageListenerContainer<String, String> {
194+
195+
val container = kafkaListenerContainerFactory.createContainer("kotlinTestTopic2")
196+
container.containerProperties.groupId = "checkedEx"
197+
container.containerProperties.messageListener = MessageListener<String, String> {
198+
if (it.value() == "fail") {
199+
throw Exception("checked")
200+
}
201+
}
202+
return container;
203+
}
204+
205+
@Bean
206+
fun batchCheckedEx(kafkaBatchListenerContainerFactory :
207+
ConcurrentKafkaListenerContainerFactory<String, String>) :
208+
ConcurrentMessageListenerContainer<String, String> {
209+
210+
val container = kafkaBatchListenerContainerFactory.createContainer("kotlinBatchTestTopic2")
211+
container.containerProperties.groupId = "batchCheckedEx"
212+
container.containerProperties.messageListener = BatchMessageListener<String, String> {
213+
if (it.first().value() == "fail") {
214+
throw Exception("checked")
215+
}
216+
}
217+
return container;
138218
}
139219

140220
}

0 commit comments

Comments
 (0)