Skip to content

Commit 51ea8e3

Browse files
garyrussellartembilan
authored andcommitted
GH-748: Handle Kotlin/Lombok Message Listeners
See #748 (comment) (issue not resolved by this) Kotlin and Lombok (`@SneakyThrows`) can throw checked exceptions even though the method signatures do not allow it. Such exceptions were not passed to the error handlers until the failed record is no longer in scope. They were called as if the consumer itself threw an exception. Always wrap listener exceptions in `ListenerExecutionFailedException`s. Also remove Java9 constructs in `@Deprecation`s. **cherry-pick to 2.6.x, 2.5.x** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java # spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java # spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java
1 parent da2eab6 commit 51ea8e3

File tree

4 files changed

+169
-64
lines changed

4 files changed

+169
-64
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/reactive/ReactiveKafkaProducerTemplate.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,17 @@ public <T> Flux<SenderResult<T>> send(Publisher<? extends SenderRecord<K, V, T>>
121121
return this.sender.send(records);
122122
}
123123

124-
public Mono<Void> flush() {
124+
/**
125+
* Flush the producer.
126+
* @return {@link Mono#empty()}.
127+
* @deprecated - flush does not make sense in the context of a reactive flow since,
128+
* the send completion signal is a send result, which implies that a flush is
129+
* redundant. If you use this method with reactor-kafka 1.3 or later, it must be
130+
* scheduled to avoid a deadlock; see
131+
* https://issues.apache.org/jira/browse/KAFKA-10790 (since 2.7).
132+
*/
133+
@Deprecated
134+
public Mono<?> flush() {
125135
return doOnProducer(producer -> {
126136
producer.flush();
127137
return null;

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 64 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,10 +1609,9 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
16091609
}
16101610

16111611
private void batchAfterRollback(final ConsumerRecords<K, V> records,
1612-
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
1612+
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException rollbackException,
16131613
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
16141614

1615-
RuntimeException rollbackException = decorateException(e);
16161615
try {
16171616
if (recordList == null) {
16181617
afterRollbackProcessorToUse.process(createRecordList(records), this.consumer,
@@ -1776,32 +1775,37 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
17761775
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
17771776
List<ConsumerRecord<K, V>> recordList) {
17781777

1779-
switch (this.listenerType) {
1780-
case ACKNOWLEDGING_CONSUMER_AWARE:
1781-
this.batchListener.onMessage(recordList,
1782-
this.isAnyManualAck
1783-
? new ConsumerBatchAcknowledgment(records)
1784-
: null, this.consumer);
1785-
break;
1786-
case ACKNOWLEDGING:
1787-
this.batchListener.onMessage(recordList,
1788-
this.isAnyManualAck
1789-
? new ConsumerBatchAcknowledgment(records)
1790-
: null);
1791-
break;
1792-
case CONSUMER_AWARE:
1793-
this.batchListener.onMessage(recordList, this.consumer);
1794-
break;
1795-
case SIMPLE:
1796-
this.batchListener.onMessage(recordList);
1797-
break;
1778+
try {
1779+
switch (this.listenerType) {
1780+
case ACKNOWLEDGING_CONSUMER_AWARE:
1781+
this.batchListener.onMessage(recordList,
1782+
this.isAnyManualAck
1783+
? new ConsumerBatchAcknowledgment(records)
1784+
: null, this.consumer);
1785+
break;
1786+
case ACKNOWLEDGING:
1787+
this.batchListener.onMessage(recordList,
1788+
this.isAnyManualAck
1789+
? new ConsumerBatchAcknowledgment(records)
1790+
: null);
1791+
break;
1792+
case CONSUMER_AWARE:
1793+
this.batchListener.onMessage(recordList, this.consumer);
1794+
break;
1795+
case SIMPLE:
1796+
this.batchListener.onMessage(recordList);
1797+
break;
1798+
}
1799+
}
1800+
catch (Exception ex) { // NOSONAR
1801+
throw decorateException(ex);
17981802
}
17991803
}
18001804

18011805
private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
1802-
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException e) {
1806+
@Nullable List<ConsumerRecord<K, V>> list, RuntimeException rte) {
18031807

1804-
this.batchErrorHandler.handle(decorateException(e), records, this.consumer,
1808+
this.batchErrorHandler.handle(rte, records, this.consumer,
18051809
KafkaMessageListenerContainer.this.thisOrParentContainer,
18061810
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
18071811
}
@@ -1860,9 +1864,9 @@ public void doInTransactionWithoutResult(TransactionStatus s) {
18601864
}
18611865
break;
18621866
}
1863-
catch (RuntimeException e) {
1864-
this.logger.error(e, "Transaction rolled back");
1865-
recordAfterRollback(iterator, record, decorateException(e));
1867+
catch (RuntimeException ex) {
1868+
this.logger.error(ex, "Transaction rolled back");
1869+
recordAfterRollback(iterator, record, ex);
18661870
}
18671871
finally {
18681872
if (this.producerPerConsumerPartition) {
@@ -2064,31 +2068,36 @@ record = this.recordInterceptor.intercept(record);
20642068
+ ListenerUtils.recordToString(recordArg));
20652069
}
20662070
else {
2067-
switch (this.listenerType) {
2068-
case ACKNOWLEDGING_CONSUMER_AWARE:
2069-
this.listener.onMessage(record,
2070-
this.isAnyManualAck
2071-
? new ConsumerAcknowledgment(record)
2072-
: null, this.consumer);
2073-
break;
2074-
case CONSUMER_AWARE:
2075-
this.listener.onMessage(record, this.consumer);
2076-
break;
2077-
case ACKNOWLEDGING:
2078-
this.listener.onMessage(record,
2079-
this.isAnyManualAck
2080-
? new ConsumerAcknowledgment(record)
2081-
: null);
2082-
break;
2083-
case SIMPLE:
2084-
this.listener.onMessage(record);
2085-
break;
2071+
try {
2072+
switch (this.listenerType) {
2073+
case ACKNOWLEDGING_CONSUMER_AWARE:
2074+
this.listener.onMessage(record,
2075+
this.isAnyManualAck
2076+
? new ConsumerAcknowledgment(record)
2077+
: null, this.consumer);
2078+
break;
2079+
case CONSUMER_AWARE:
2080+
this.listener.onMessage(record, this.consumer);
2081+
break;
2082+
case ACKNOWLEDGING:
2083+
this.listener.onMessage(record,
2084+
this.isAnyManualAck
2085+
? new ConsumerAcknowledgment(record)
2086+
: null);
2087+
break;
2088+
case SIMPLE:
2089+
this.listener.onMessage(record);
2090+
break;
2091+
}
2092+
}
2093+
catch (Exception ex) { // NOSONAR
2094+
throw decorateException(ex);
20862095
}
20872096
}
20882097
}
20892098

20902099
private void invokeErrorHandler(final ConsumerRecord<K, V> record,
2091-
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException e) {
2100+
Iterator<ConsumerRecord<K, V>> iterator, RuntimeException rte) {
20922101

20932102
if (this.errorHandler instanceof RemainingRecordsErrorHandler) {
20942103
if (this.producer == null) {
@@ -2099,30 +2108,33 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
20992108
while (iterator.hasNext()) {
21002109
records.add(iterator.next());
21012110
}
2102-
this.errorHandler.handle(decorateException(e), records, this.consumer,
2111+
this.errorHandler.handle(rte, records, this.consumer,
21032112
KafkaMessageListenerContainer.this.thisOrParentContainer);
21042113
}
21052114
else {
2106-
this.errorHandler.handle(decorateException(e), record, this.consumer);
2115+
this.errorHandler.handle(rte, record, this.consumer);
21072116
}
21082117
}
21092118

2110-
private RuntimeException decorateException(RuntimeException e) {
2111-
RuntimeException toHandle = e;
2119+
private RuntimeException decorateException(Exception ex) {
2120+
Exception toHandle = ex;
21122121
if (toHandle instanceof ListenerExecutionFailedException) {
21132122
toHandle = new ListenerExecutionFailedException(toHandle.getMessage(), this.consumerGroupId,
21142123
toHandle.getCause());
21152124
}
21162125
else {
21172126
toHandle = new ListenerExecutionFailedException("Listener failed", this.consumerGroupId, toHandle);
21182127
}
2119-
return toHandle;
2128+
return (RuntimeException) toHandle;
21202129
}
21212130

21222131
public void checkDeser(final ConsumerRecord<K, V> record, String headerName) {
21232132
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record, headerName, this.logger);
21242133
if (exception != null) {
2125-
throw exception;
2134+
/*
2135+
* Wrapping in a LEFE is not strictly correct, but required for backwards compatibility.
2136+
*/
2137+
throw decorateException(exception);
21262138
}
21272139
}
21282140

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,10 @@ public static String recordToString(ConsumerRecord<?, ?> record, boolean meta) {
153153
* @param lastIntervals a thread local containing the previous {@link BackOff}
154154
* interval for this thread.
155155
* @since 2.3.12
156+
* @deprecated since 2.7 in favor of
157+
* {@link #unrecoverableBackOff(BackOff, ThreadLocal, ThreadLocal, MessageListenerContainer)}.
156158
*/
159+
@Deprecated
157160
public static void unrecoverableBackOff(BackOff backOff, ThreadLocal<BackOffExecution> executions,
158161
ThreadLocal<Long> lastIntervals) {
159162

spring-kafka/src/test/kotlin/org/springframework/kafka/listener/EnableKafkaKotlinTests.kt

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,10 +33,16 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3333
import org.springframework.kafka.core.DefaultKafkaProducerFactory
3434
import org.springframework.kafka.core.KafkaTemplate
3535
import org.springframework.kafka.core.ProducerFactory
36+
import org.springframework.kafka.listener.BatchErrorHandler
37+
import org.springframework.kafka.listener.BatchMessageListener
38+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer
39+
import org.springframework.kafka.listener.ErrorHandler
40+
import org.springframework.kafka.listener.MessageListener
3641
import org.springframework.kafka.test.EmbeddedKafkaBroker
3742
import org.springframework.kafka.test.context.EmbeddedKafka
3843
import org.springframework.test.annotation.DirtiesContext
3944
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
45+
import java.lang.Exception
4046
import java.util.concurrent.CountDownLatch
4147
import java.util.concurrent.TimeUnit
4248

@@ -48,7 +54,7 @@ import java.util.concurrent.TimeUnit
4854

4955
@SpringJUnitConfig
5056
@DirtiesContext
51-
@EmbeddedKafka(topics = ["kotlinTestTopic"])
57+
@EmbeddedKafka(topics = ["kotlinTestTopic1", "kotlinBatchTestTopic1", "kotlinTestTopic2", "kotlinBatchTestTopic2"])
5258
class EnableKafkaKotlinTests {
5359

5460
@Autowired
@@ -59,26 +65,55 @@ class EnableKafkaKotlinTests {
5965

6066
@Test
6167
fun `test listener`() {
62-
this.template.send("kotlinTestTopic", "foo")
63-
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
68+
this.template.send("kotlinTestTopic1", "foo")
69+
assertThat(this.config.latch1.await(10, TimeUnit.SECONDS)).isTrue()
6470
assertThat(this.config.received).isEqualTo("foo")
6571
}
6672

73+
@Test
74+
fun `test checkedEx`() {
75+
this.template.send("kotlinTestTopic2", "fail")
76+
assertThat(this.config.latch2.await(10, TimeUnit.SECONDS)).isTrue()
77+
assertThat(this.config.error).isTrue()
78+
}
79+
6780
@Test
6881
fun `test batch listener`() {
69-
this.template.send("kotlinTestTopic", "foo")
70-
assertThat(this.config.latch.await(10, TimeUnit.SECONDS)).isTrue()
82+
this.template.send("kotlinBatchTestTopic1", "foo")
83+
assertThat(this.config.batchLatch1.await(10, TimeUnit.SECONDS)).isTrue()
7184
assertThat(this.config.batchReceived).isEqualTo("foo")
7285
}
7386

87+
@Test
88+
fun `test batch checkedEx`() {
89+
this.template.send("kotlinBatchTestTopic2", "fail")
90+
assertThat(this.config.batchLatch2.await(10, TimeUnit.SECONDS)).isTrue()
91+
assertThat(this.config.batchError).isTrue()
92+
}
93+
7494
@Configuration
7595
@EnableKafka
7696
class Config {
7797

98+
@Volatile
7899
lateinit var received: String
100+
101+
@Volatile
79102
lateinit var batchReceived: String
80103

81-
val latch = CountDownLatch(2)
104+
@Volatile
105+
var error: Boolean = false
106+
107+
@Volatile
108+
var batchError: Boolean = false
109+
110+
val latch1 = CountDownLatch(1)
111+
112+
val latch2 = CountDownLatch(1)
113+
114+
val batchLatch1 = CountDownLatch(1)
115+
116+
val batchLatch2 = CountDownLatch(1)
82117

83118
@Value("\${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
84119
private lateinit var brokerAddresses: String
@@ -108,33 +143,78 @@ class EnableKafkaKotlinTests {
108143
return KafkaTemplate(kpf())
109144
}
110145

146+
val eh = ErrorHandler { _, recs : ConsumerRecord<*, *>? ->
147+
if (recs != null) {
148+
this.error = true;
149+
this.latch2.countDown()
150+
}
151+
}
152+
111153
@Bean
112154
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
113155
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
114156
= ConcurrentKafkaListenerContainerFactory()
115157
factory.consumerFactory = kcf()
158+
factory.setErrorHandler(eh)
116159
return factory
117160
}
118161

162+
val beh = BatchErrorHandler { _, recs ->
163+
if (!recs.isEmpty) {
164+
this.batchError = true;
165+
this.batchLatch2.countDown()
166+
}
167+
}
168+
119169
@Bean
120170
fun kafkaBatchListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
121171
val factory: ConcurrentKafkaListenerContainerFactory<String, String>
122172
= ConcurrentKafkaListenerContainerFactory()
123173
factory.isBatchListener = true
124174
factory.consumerFactory = kcf()
175+
factory.setBatchErrorHandler(beh)
125176
return factory
126177
}
127178

128-
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic"], containerFactory = "kafkaListenerContainerFactory")
179+
@KafkaListener(id = "kotlin", topics = ["kotlinTestTopic1"], containerFactory = "kafkaListenerContainerFactory")
129180
fun listen(value: String) {
130181
this.received = value
131-
this.latch.countDown()
182+
this.latch1.countDown()
132183
}
133184

134-
@KafkaListener(id = "kotlin-batch", topics = ["kotlinTestTopic"], containerFactory = "kafkaBatchListenerContainerFactory")
185+
@KafkaListener(id = "kotlin-batch", topics = ["kotlinBatchTestTopic1"], containerFactory = "kafkaBatchListenerContainerFactory")
135186
fun batchListen(values: List<ConsumerRecord<String, String>>) {
136187
this.batchReceived = values.first().value()
137-
this.latch.countDown()
188+
this.batchLatch1.countDown()
189+
}
190+
191+
@Bean
192+
fun checkedEx(kafkaListenerContainerFactory : ConcurrentKafkaListenerContainerFactory<String, String>) :
193+
ConcurrentMessageListenerContainer<String, String> {
194+
195+
val container = kafkaListenerContainerFactory.createContainer("kotlinTestTopic2")
196+
container.containerProperties.groupId = "checkedEx"
197+
container.containerProperties.messageListener = MessageListener<String, String> {
198+
if (it.value() == "fail") {
199+
throw Exception("checked")
200+
}
201+
}
202+
return container;
203+
}
204+
205+
@Bean
206+
fun batchCheckedEx(kafkaBatchListenerContainerFactory :
207+
ConcurrentKafkaListenerContainerFactory<String, String>) :
208+
ConcurrentMessageListenerContainer<String, String> {
209+
210+
val container = kafkaBatchListenerContainerFactory.createContainer("kotlinBatchTestTopic2")
211+
container.containerProperties.groupId = "batchCheckedEx"
212+
container.containerProperties.messageListener = BatchMessageListener<String, String> {
213+
if (it.first().value() == "fail") {
214+
throw Exception("checked")
215+
}
216+
}
217+
return container;
138218
}
139219

140220
}

0 commit comments

Comments
 (0)