Skip to content

Commit 8323d63

Browse files
committed
ReplyingKTemplate Handle DeserializationException
Handle `DeserializationException` via `ErrorHandlingDeserializer` in `ReplyingKafkaTemplate`. **I will do the backports after review/merge; I expect conflicts**
1 parent 5b2fd49 commit 8323d63

File tree

3 files changed

+119
-11
lines changed

3 files changed

+119
-11
lines changed

spring-kafka/src/main/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplate.java

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,19 @@
3737
import org.springframework.beans.factory.DisposableBean;
3838
import org.springframework.beans.factory.InitializingBean;
3939
import org.springframework.context.SmartLifecycle;
40+
import org.springframework.core.log.LogAccessor;
4041
import org.springframework.kafka.KafkaException;
4142
import org.springframework.kafka.core.KafkaTemplate;
4243
import org.springframework.kafka.core.ProducerFactory;
4344
import org.springframework.kafka.listener.BatchMessageListener;
4445
import org.springframework.kafka.listener.ContainerProperties;
4546
import org.springframework.kafka.listener.GenericMessageListenerContainer;
47+
import org.springframework.kafka.listener.ListenerUtils;
4648
import org.springframework.kafka.support.KafkaHeaders;
4749
import org.springframework.kafka.support.TopicPartitionOffset;
50+
import org.springframework.kafka.support.serializer.DeserializationException;
51+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
52+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
4853
import org.springframework.lang.Nullable;
4954
import org.springframework.scheduling.TaskScheduler;
5055
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
@@ -92,8 +97,7 @@ public class ReplyingKafkaTemplate<K, V, R> extends KafkaTemplate<K, V> implemen
9297

9398
private boolean sharedReplyTopic;
9499

95-
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy =
96-
ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
100+
private Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy = ReplyingKafkaTemplate::defaultCorrelationIdStrategy;
97101

98102
private String correlationHeaderName = KafkaHeaders.CORRELATION_ID;
99103

@@ -230,8 +234,8 @@ public Collection<TopicPartition> getAssignedReplyTopicPartitions() {
230234
}
231235

232236
/**
233-
* Set to true when multiple templates are using the same topic for replies.
234-
* This simply changes logs for unexpected replies to debug instead of error.
237+
* Set to true when multiple templates are using the same topic for replies. This
238+
* simply changes logs for unexpected replies to debug instead of error.
235239
* @param sharedReplyTopic true if using a shared topic.
236240
* @since 2.2
237241
*/
@@ -439,13 +443,54 @@ public void onMessage(List<ConsumerRecord<K, R>> data) {
439443
logLateArrival(record, correlationId);
440444
}
441445
else {
442-
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
443-
future.set(record);
446+
boolean ok = true;
447+
if (record.value() == null) {
448+
DeserializationException de = checkDeserialization(record, this.logger);
449+
if (de != null) {
450+
ok = false;
451+
future.setException(de);
452+
}
453+
}
454+
if (ok) {
455+
this.logger.debug(() -> "Received: " + record + WITH_CORRELATION_ID + correlationKey);
456+
future.set(record);
457+
}
444458
}
445459
}
446460
});
447461
}
448462

463+
/**
464+
* Return a {@link DeserializationException} if either the key or value failed
465+
* deserialization; null otherwise. If you need to determine whether it was the key or
466+
* value, call
467+
* {@link ListenerUtils#getExceptionFromHeader(ConsumerRecord, String, LogAccessor)}
468+
* with {@link ErrorHandlingDeserializer#KEY_DESERIALIZER_EXCEPTION_HEADER} and
469+
* {@link ErrorHandlingDeserializer#VALUE_DESERIALIZER_EXCEPTION_HEADER} instead.
470+
* @param record the record.
471+
* @param logger a {@link LogAccessor}.
472+
* @return the {@link DeserializationException} or {@code null}.
473+
* @since 2.2.15
474+
*/
475+
@Nullable
476+
public static DeserializationException checkDeserialization(ConsumerRecord<?, ?> record, LogAccessor logger) {
477+
DeserializationException exception = ListenerUtils.getExceptionFromHeader(record,
478+
ErrorHandlingDeserializer2.VALUE_DESERIALIZER_EXCEPTION_HEADER, logger);
479+
if (exception != null) {
480+
logger.error(exception, () -> "Reply value deserialization failed for " + record.topic() + "-"
481+
+ record.partition() + "@" + record.offset());
482+
return exception;
483+
}
484+
exception = ListenerUtils.getExceptionFromHeader(record,
485+
ErrorHandlingDeserializer2.KEY_DESERIALIZER_EXCEPTION_HEADER, logger);
486+
if (exception != null) {
487+
logger.error(exception, () -> "Reply key deserialization failed for " + record.topic() + "-"
488+
+ record.partition() + "@" + record.offset());
489+
return exception;
490+
}
491+
return null;
492+
}
493+
449494
protected void logLateArrival(ConsumerRecord<K, R> record, CorrelationKey correlationId) {
450495
if (this.sharedReplyTopic) {
451496
this.logger.debug(() -> missingCorrelationLogMessage(record, correlationId));

spring-kafka/src/test/java/org/springframework/kafka/requestreply/ReplyingKafkaTemplateTests.java

Lines changed: 58 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.kafka.common.header.Headers;
5353
import org.apache.kafka.common.header.internals.RecordHeader;
5454
import org.apache.kafka.common.header.internals.RecordHeaders;
55+
import org.apache.kafka.common.serialization.Deserializer;
5556
import org.junit.jupiter.api.BeforeEach;
5657
import org.junit.jupiter.api.Disabled;
5758
import org.junit.jupiter.api.Test;
@@ -79,6 +80,8 @@
7980
import org.springframework.kafka.support.SimpleKafkaHeaderMapper;
8081
import org.springframework.kafka.support.TopicPartitionOffset;
8182
import org.springframework.kafka.support.converter.MessagingMessageConverter;
83+
import org.springframework.kafka.support.serializer.DeserializationException;
84+
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2;
8285
import org.springframework.kafka.test.EmbeddedKafkaBroker;
8386
import org.springframework.kafka.test.context.EmbeddedKafka;
8487
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -103,7 +106,7 @@
103106
ReplyingKafkaTemplateTests.D_REPLY, ReplyingKafkaTemplateTests.D_REQUEST,
104107
ReplyingKafkaTemplateTests.E_REPLY, ReplyingKafkaTemplateTests.E_REQUEST,
105108
ReplyingKafkaTemplateTests.F_REPLY, ReplyingKafkaTemplateTests.F_REQUEST,
106-
ReplyingKafkaTemplateTests.G_REPLY, ReplyingKafkaTemplateTests.G_REQUEST })
109+
ReplyingKafkaTemplateTests.J_REPLY, ReplyingKafkaTemplateTests.J_REQUEST })
107110
public class ReplyingKafkaTemplateTests {
108111

109112
public static final String A_REPLY = "aReply";
@@ -134,6 +137,10 @@ public class ReplyingKafkaTemplateTests {
134137

135138
public static final String G_REQUEST = "gRequest";
136139

140+
public static final String J_REPLY = "jReply";
141+
142+
public static final String J_REQUEST = "jRequest";
143+
137144
@Autowired
138145
private EmbeddedKafkaBroker embeddedKafka;
139146

@@ -180,6 +187,25 @@ public void testGood() throws Exception {
180187
}
181188
}
182189

190+
@Test
191+
public void testBadDeserialize() throws Exception {
192+
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(J_REPLY, true);
193+
try {
194+
template.setDefaultReplyTimeout(Duration.ofSeconds(30));
195+
Headers headers = new RecordHeaders();
196+
headers.add("baz", "buz".getBytes());
197+
ProducerRecord<Integer, String> record = new ProducerRecord<>(J_REQUEST, null, null, null, "foo", headers);
198+
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
199+
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
200+
assertThatExceptionOfType(ExecutionException.class).isThrownBy(() -> future.get(10, TimeUnit.SECONDS))
201+
.withCauseExactlyInstanceOf(DeserializationException.class);
202+
}
203+
finally {
204+
template.stop();
205+
template.destroy();
206+
}
207+
}
208+
183209
@Test
184210
public void testMultiListenerMessageReturn() throws Exception {
185211
ReplyingKafkaTemplate<Integer, String, String> template = createTemplate(C_REPLY);
@@ -428,6 +454,12 @@ public void testAggregateOrphansNotStored() throws Exception {
428454
}
429455

430456
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic) throws Exception {
457+
return createTemplate(topic, false);
458+
}
459+
460+
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(String topic, boolean badDeser)
461+
throws Exception {
462+
431463
ContainerProperties containerProperties = new ContainerProperties(topic);
432464
final CountDownLatch latch = new CountDownLatch(1);
433465
containerProperties.setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@@ -443,9 +475,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
443475
}
444476

445477
});
446-
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false",
447-
embeddedKafka);
478+
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false", embeddedKafka);
448479
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
480+
if (badDeser) {
481+
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer2.class);
482+
consumerProps.put(ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS, BadDeser.class);
483+
}
449484
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
450485
KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf,
451486
containerProperties);
@@ -461,7 +496,6 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
461496
}
462497

463498
public ReplyingKafkaTemplate<Integer, String, String> createTemplate(TopicPartitionOffset topic) {
464-
465499
ContainerProperties containerProperties = new ContainerProperties(topic);
466500
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(this.testName, "false",
467501
embeddedKafka);
@@ -630,6 +664,12 @@ public void gListener(Message<String> in) {
630664
in.getHeaders().get("custom.correlation.id", byte[].class)));
631665
template().send(record);
632666
}
667+
@KafkaListener(id = J_REQUEST, topics = J_REQUEST)
668+
@SendTo // default REPLY_TOPIC header
669+
public String handleJ(String in) throws InterruptedException {
670+
return in.toUpperCase();
671+
}
672+
633673
}
634674

635675
@KafkaListener(topics = C_REQUEST, groupId = C_REQUEST)
@@ -648,4 +688,18 @@ public Message<?> listen1(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] re
648688

649689
}
650690

691+
public static class BadDeser implements Deserializer<Object> {
692+
693+
@Override
694+
public Object deserialize(String topic, byte[] data) {
695+
return null;
696+
}
697+
698+
@Override
699+
public Object deserialize(String topic, Headers headers, byte[] data) {
700+
throw new IllegalStateException("test reply deserialization failure");
701+
}
702+
703+
}
704+
651705
}

src/reference/asciidoc/kafka.adoc

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,6 +414,9 @@ public class KRequestingApplication {
414414

415415
Note that we can use Boot's auto-configured container factory to create the reply container.
416416

417+
If a non-trivial deserializer is being used for replies, consider using an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> that delegates to your configured deserializer.
418+
When so configured, the `RequestReplyFuture` will be completed exceptionally and you can catch the `ExecutionException`, with the `DeserializationException` in its `cause` property.
419+
417420
The template sets a header (named `KafkaHeaders.CORRELATION_ID` by default), which must be echoed back by the server side.
418421

419422
In this case, the following `@KafkaListener` application responds:
@@ -557,6 +560,12 @@ IMPORTANT: The listener container for the replies MUST be configured with `AckMo
557560
To avoid any possibility of losing messages, the template only commits offsets when there are zero requests outstanding, i.e. when the last outstanding request is released by the release strategy.
558561
After a rebalance, it is possible for duplicate reply deliveries; these will be ignored for any in-flight requests; you may see error log messages when duplicate replies are received for already released replies.
559562

563+
NOTE: If you use an <<error-handling-deserializer,`ErrorHandlingDeserializer`>> with this aggregating template, the framework will not automatically detect `DeserializationException` s.
564+
Instead, the record (with a `null` value) will be returned intact, with the deserialization exception(s) in headers.
565+
It is recommended that applications call the utility method `ReplyingKafkaTemplate.checkDeserialization()` method to determine if a deserialization exception occurred.
566+
See its javadocs for more information.
567+
568+
[[receiving-messages]]
560569
==== Receiving Messages
561570

562571
You can receive messages by configuring a `MessageListenerContainer` and providing a message listener or by using the `@KafkaListener` annotation.
@@ -3023,7 +3032,7 @@ CAUTION: When you use a `BatchMessageListener`, you must provide a `failedDeseri
30233032
Otherwise, the batch of records are not type safe.
30243033

30253034
You can use the `DefaultKafkaConsumerFactory` constructor that takes key and value `Deserializer` objects and wire in appropriate `ErrorHandlingDeserializer2` instances that you have configured with the proper delegates.
3026-
Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer`) to instantiate the delegates.
3035+
Alternatively, you can use consumer configuration properties (which are used by the `ErrorHandlingDeserializer2`) to instantiate the delegates.
30273036
The property names are `ErrorHandlingDeserializer2.KEY_DESERIALIZER_CLASS` and `ErrorHandlingDeserializer2.VALUE_DESERIALIZER_CLASS`.
30283037
The property value can be a class or class name.
30293038
The following example shows how to set these properties:

0 commit comments

Comments
 (0)