Skip to content

Commit cd7de80

Browse files
authored
GH-2711: Non-Blocking Retries and Custom Publisher
Resolves #2711 Previously, it was not easy to provide custom DLPRs, for example to override `createProducerRecord`. * Fix Javadocs.
1 parent 1eaaa27 commit cd7de80

File tree

4 files changed

+186
-50
lines changed

4 files changed

+186
-50
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,29 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t
613613
By default, any headers added will be cumulative - Kafka headers can contain multiple values.
614614
Starting with version 2.9.5, if the `Headers` returned by the function contains a header of type `DeadLetterPublishingRecoverer.SingleRecordHeader`, then any existing values for that header will be removed and only the new single value will remain.
615615

616+
[[custom-dlpr]]
617+
===== Custom DeadLetterPublishingRecoverer
618+
619+
As can be seen in <<retry-headers>> it is possible to customize the default `DeadLetterPublishingRecoverer` instances created by the framework.
620+
However, for some use cases, it is necessary to subclass the `DeadLetterPublishingRecoverer`, for example to override `createProducerRecord()` to modify the contents sent to the retry (or dead-letter) topics.
621+
Starting with version 3.0.9, you can override the `RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()` method to provide a `DeadLetterPublisherCreator` instance, for example:
622+
623+
====
624+
[source, java]
625+
----
626+
@Override
627+
protected Consumer<DeadLetterPublishingRecovererFactory>
628+
configureDeadLetterPublishingContainerFactory() {
629+
630+
return (factory) -> factory.setDeadLetterPublisherCreator(
631+
(templateResolver, destinationResolver) ->
632+
new CustomDLPR(templateResolver, destinationResolver));
633+
}
634+
----
635+
====
636+
637+
It is recommended that you use the provided resolvers when constructing the custom instance.
638+
616639
[[retry-topic-combine-blocking]]
617640
==== Combining Blocking and Non-Blocking Retries
618641

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 56 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,6 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
8181

8282
private static final long THIRTY = 30L;
8383

84-
private final HeaderNames headerNames = getHeaderNames();
85-
8684
private final boolean transactional;
8785

8886
private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;
@@ -91,6 +89,8 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
9189

9290
private final EnumSet<HeaderNames.HeadersToAdd> whichHeaders = EnumSet.allOf(HeaderNames.HeadersToAdd.class);
9391

92+
private HeaderNames headerNames = getHeaderNames();
93+
9494
private boolean retainExceptionHeader;
9595

9696
private BiFunction<ConsumerRecord<?, ?>, Exception, Headers> headersFunction = DEFAULT_HEADERS_FUNCTION;
@@ -115,6 +115,24 @@ public class DeadLetterPublishingRecoverer extends ExceptionClassifier implement
115115

116116
private ExceptionHeadersCreator exceptionHeadersCreator = this::addExceptionInfoHeaders;
117117

118+
private Supplier<HeaderNames> headerNamesSupplier = () -> HeaderNames.Builder
119+
.original()
120+
.offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)
121+
.timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)
122+
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
123+
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
124+
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
125+
.consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)
126+
.exception()
127+
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
128+
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
129+
.exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)
130+
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
131+
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
132+
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
133+
.exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)
134+
.build();
135+
118136
/**
119137
* Create an instance with the provided template and a default destination resolving
120138
* function that returns a TopicPartition based on the original topic (appended with ".DLT")
@@ -188,6 +206,23 @@ public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Obj
188206
this.destinationResolver = destinationResolver;
189207
}
190208

209+
/**
210+
* Create an instance with a template resolving function that receives the failed
211+
* consumer record and the exception and returns a {@link KafkaOperations} and a
212+
* flag on whether or not the publishing from this instance will be transactional
213+
* or not. Also receives a destination resolving function that works similarly but
214+
* returns a {@link TopicPartition} instead. If the partition in the {@link TopicPartition}
215+
* is less than 0, no partition is set when publishing to the topic.
216+
*
217+
* @param templateResolver the function that resolver the {@link KafkaOperations} to use for publishing.
218+
* @param destinationResolver the resolving function.
219+
* @since 3.0.9
220+
*/
221+
public DeadLetterPublishingRecoverer(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
222+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
223+
this(templateResolver, false, destinationResolver);
224+
}
225+
191226
/**
192227
* Create an instance with a template resolving function that receives the failed
193228
* consumer record and the exception and returns a {@link KafkaOperations} and a
@@ -487,6 +522,9 @@ public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consume
487522
private void addAndEnhanceHeaders(ConsumerRecord<?, ?> record, Exception exception,
488523
@Nullable DeserializationException vDeserEx, @Nullable DeserializationException kDeserEx, Headers headers) {
489524

525+
if (this.headerNames == null) {
526+
this.headerNames = this.headerNamesSupplier.get();
527+
}
490528
if (kDeserEx != null) {
491529
if (!this.retainExceptionHeader) {
492530
headers.remove(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER);
@@ -825,25 +863,24 @@ private String getStackTraceAsString(Throwable cause) {
825863
* in the sent record.
826864
* @return the header names.
827865
* @since 2.7
866+
* @deprecated since 3.0.9 - provide a supplier instead.
867+
* @see #setHeaderNamesSupplier(Supplier)
828868
*/
869+
@Nullable
870+
@Deprecated(since = "3.0.9", forRemoval = true)
829871
protected HeaderNames getHeaderNames() {
830-
return HeaderNames.Builder
831-
.original()
832-
.offsetHeader(KafkaHeaders.DLT_ORIGINAL_OFFSET)
833-
.timestampHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP)
834-
.timestampTypeHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)
835-
.topicHeader(KafkaHeaders.DLT_ORIGINAL_TOPIC)
836-
.partitionHeader(KafkaHeaders.DLT_ORIGINAL_PARTITION)
837-
.consumerGroupHeader(KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP)
838-
.exception()
839-
.keyExceptionFqcn(KafkaHeaders.DLT_KEY_EXCEPTION_FQCN)
840-
.exceptionFqcn(KafkaHeaders.DLT_EXCEPTION_FQCN)
841-
.exceptionCauseFqcn(KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN)
842-
.keyExceptionMessage(KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE)
843-
.exceptionMessage(KafkaHeaders.DLT_EXCEPTION_MESSAGE)
844-
.keyExceptionStacktrace(KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE)
845-
.exceptionStacktrace(KafkaHeaders.DLT_EXCEPTION_STACKTRACE)
846-
.build();
872+
return null;
873+
}
874+
875+
/**
876+
* Set a {@link Supplier} for {@link HeaderNames}.
877+
* @param supplier the supplier.
878+
* @since3.0.7
879+
*
880+
*/
881+
public void setHeaderNamesSupplier(Supplier<HeaderNames> supplier) {
882+
Assert.notNull(supplier, "'HeaderNames supplier cannot be null");
883+
this.headerNamesSupplier = supplier;
847884
}
848885

849886
/**

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DeadLetterPublishingRecovererFactory.java

Lines changed: 73 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.function.BiFunction;
2626
import java.util.function.Consumer;
2727
import java.util.function.Function;
28+
import java.util.function.Supplier;
2829

2930
import org.apache.commons.logging.LogFactory;
3031
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,6 +39,7 @@
3839
import org.springframework.core.log.LogAccessor;
3940
import org.springframework.kafka.core.KafkaOperations;
4041
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
42+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.HeaderNames;
4143
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer.SingleRecordHeader;
4244
import org.springframework.kafka.listener.SeekUtils;
4345
import org.springframework.kafka.listener.TimestampedException;
@@ -76,6 +78,8 @@ public class DeadLetterPublishingRecovererFactory {
7678

7779
private boolean retainAllRetryHeaderValues = true;
7880

81+
private DeadLetterPublisherCreator dlpCreator = this::create;
82+
7983
public DeadLetterPublishingRecovererFactory(DestinationTopicResolver destinationTopicResolver) {
8084
this.destinationTopicResolver = destinationTopicResolver;
8185
}
@@ -113,6 +117,28 @@ public void setRetainAllRetryHeaderValues(boolean retainAllRetryHeaderValues) {
113117
this.retainAllRetryHeaderValues = retainAllRetryHeaderValues;
114118
}
115119

120+
/**
121+
* Provide a {@link DeadLetterPublisherCreator}; used to create a subclass of the
122+
* {@link DeadLetterPublishingRecoverer}, instead of the default, for example, to
123+
* modify the published records.
124+
* @param creator the creator,
125+
* @since 3.0.9.
126+
*/
127+
public void setDeadLetterPublisherCreator(DeadLetterPublisherCreator creator) {
128+
Assert.notNull(creator, "'creator' cannot be null");
129+
this.dlpCreator = creator;
130+
}
131+
132+
/**
133+
* Set a customizer to customize the default {@link DeadLetterPublishingRecoverer}.
134+
* @param customizer the customizer.
135+
* @see #setDeadLetterPublisherCreator(DeadLetterPublisherCreator)
136+
*/
137+
public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
138+
Assert.notNull(customizer, "'customizer' cannot be null");
139+
this.recovererCustomizer = customizer;
140+
}
141+
116142
/**
117143
* Add exception type to the default list. By default, the following exceptions will
118144
* not be retried:
@@ -175,31 +201,26 @@ public void alwaysLogListenerException() {
175201
@SuppressWarnings("unchecked")
176202
public DeadLetterPublishingRecoverer create(String mainListenerId) {
177203
Assert.notNull(mainListenerId, "'listenerId' cannot be null");
178-
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(// NOSONAR anon. class size
179-
templateResolver(mainListenerId), false, destinationResolver(mainListenerId)) {
180-
181-
@Override
182-
protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
183-
return DeadLetterPublishingRecoverer.HeaderNames.Builder
184-
.original()
185-
.offsetHeader(KafkaHeaders.ORIGINAL_OFFSET)
186-
.timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP)
187-
.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
188-
.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
189-
.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
190-
.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
191-
.exception()
192-
.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
193-
.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
194-
.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
195-
.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
196-
.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
197-
.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)
198-
.exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE)
199-
.build();
200-
}
201-
};
202-
204+
Supplier<HeaderNames> headerNamesSupplier = () -> HeaderNames.Builder
205+
.original()
206+
.offsetHeader(KafkaHeaders.ORIGINAL_OFFSET)
207+
.timestampHeader(KafkaHeaders.ORIGINAL_TIMESTAMP)
208+
.timestampTypeHeader(KafkaHeaders.ORIGINAL_TIMESTAMP_TYPE)
209+
.topicHeader(KafkaHeaders.ORIGINAL_TOPIC)
210+
.partitionHeader(KafkaHeaders.ORIGINAL_PARTITION)
211+
.consumerGroupHeader(KafkaHeaders.ORIGINAL_CONSUMER_GROUP)
212+
.exception()
213+
.keyExceptionFqcn(KafkaHeaders.KEY_EXCEPTION_FQCN)
214+
.exceptionFqcn(KafkaHeaders.EXCEPTION_FQCN)
215+
.exceptionCauseFqcn(KafkaHeaders.EXCEPTION_CAUSE_FQCN)
216+
.keyExceptionMessage(KafkaHeaders.KEY_EXCEPTION_MESSAGE)
217+
.exceptionMessage(KafkaHeaders.EXCEPTION_MESSAGE)
218+
.keyExceptionStacktrace(KafkaHeaders.KEY_EXCEPTION_STACKTRACE)
219+
.exceptionStacktrace(KafkaHeaders.EXCEPTION_STACKTRACE)
220+
.build();
221+
DeadLetterPublishingRecoverer recoverer = this.dlpCreator.create(templateResolver(mainListenerId),
222+
destinationResolver(mainListenerId));
223+
recoverer.setHeaderNamesSupplier(headerNamesSupplier);
203224
recoverer.setHeadersFunction(
204225
(consumerRecord, e) -> addHeaders(mainListenerId, consumerRecord, e, getAttempts(consumerRecord)));
205226
if (this.headersFunction != null) {
@@ -215,16 +236,19 @@ protected DeadLetterPublishingRecoverer.HeaderNames getHeaderNames() {
215236
return recoverer;
216237
}
217238

239+
private DeadLetterPublishingRecoverer create(
240+
Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
241+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
242+
243+
return new DeadLetterPublishingRecoverer(templateResolver, destinationResolver);
244+
}
245+
218246
private Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver(String mainListenerId) {
219247
return outRecord -> this.destinationTopicResolver
220248
.getDestinationTopicByName(mainListenerId, outRecord.topic())
221249
.getKafkaOperations();
222250
}
223251

224-
public void setDeadLetterPublishingRecovererCustomizer(Consumer<DeadLetterPublishingRecoverer> customizer) {
225-
this.recovererCustomizer = customizer;
226-
}
227-
228252
private BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver(String mainListenerId) {
229253
return (cr, ex) -> {
230254
if (SeekUtils.isBackoffException(ex)) {
@@ -412,4 +436,24 @@ private enum ListenerExceptionLoggingStrategy {
412436
AFTER_RETRIES_EXHAUSTED
413437

414438
}
439+
440+
/**
441+
* Implement this interface to create each {@link DeadLetterPublishingRecoverer}.
442+
*
443+
* @since 3.0.9
444+
*/
445+
@FunctionalInterface
446+
public interface DeadLetterPublisherCreator {
447+
448+
/**
449+
* Create a {@link DeadLetterPublishingRecoverer} using the supplied properties.
450+
* @param templateResolver the template resolver.
451+
* @param destinationResolver the destination resolver.
452+
* @return the publisher.
453+
*/
454+
DeadLetterPublishingRecoverer create(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
455+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver);
456+
457+
}
458+
415459
}

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationIntegrationTests.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021-2022 the original author or authors.
2+
* Copyright 2021-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,10 +24,15 @@
2424
import java.util.Map;
2525
import java.util.concurrent.CountDownLatch;
2626
import java.util.concurrent.TimeUnit;
27+
import java.util.function.BiFunction;
28+
import java.util.function.Function;
2729

2830
import org.apache.kafka.clients.admin.AdminClientConfig;
2931
import org.apache.kafka.clients.consumer.Consumer;
32+
import org.apache.kafka.clients.consumer.ConsumerRecord;
33+
import org.apache.kafka.clients.producer.ProducerRecord;
3034
import org.apache.kafka.common.PartitionInfo;
35+
import org.apache.kafka.common.TopicPartition;
3136
import org.junit.jupiter.api.Test;
3237

3338
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,12 +42,15 @@
3742
import org.springframework.kafka.annotation.KafkaListener;
3843
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
3944
import org.springframework.kafka.config.KafkaListenerContainerFactory;
45+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
4046
import org.springframework.kafka.core.ConsumerFactory;
4147
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
4248
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
4349
import org.springframework.kafka.core.KafkaAdmin;
50+
import org.springframework.kafka.core.KafkaOperations;
4451
import org.springframework.kafka.core.KafkaTemplate;
4552
import org.springframework.kafka.core.ProducerFactory;
53+
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
4654
import org.springframework.kafka.test.EmbeddedKafkaBroker;
4755
import org.springframework.kafka.test.context.EmbeddedKafka;
4856
import org.springframework.kafka.test.utils.KafkaTestUtils;
@@ -66,7 +74,8 @@ class RetryTopicConfigurationIntegrationTests {
6674
@Test
6775
void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFactory<Integer, String> cf,
6876
@Autowired KafkaTemplate<Integer, String> template, @Autowired Config config,
69-
@Autowired RetryTopicComponentFactory componentFactory) throws InterruptedException {
77+
@Autowired RetryTopicComponentFactory componentFactory, @Autowired KafkaListenerEndpointRegistry registry)
78+
throws InterruptedException {
7079

7180
Consumer<Integer, String> consumer = cf.createConsumer("grp2", "");
7281
Map<String, List<PartitionInfo>> topics = consumer.listTopics();
@@ -76,6 +85,11 @@ void includeTopic(@Autowired EmbeddedKafkaBroker broker, @Autowired ConsumerFact
7685
template.send(TOPIC1, "foo");
7786
assertThat(config.latch.await(10, TimeUnit.SECONDS)).isTrue();
7887
verify(componentFactory).destinationTopicResolver();
88+
assertThat(registry.getListenerContainer(TOPIC1))
89+
.extracting("commonErrorHandler")
90+
.extracting("failureTracker")
91+
.extracting("recoverer")
92+
.isInstanceOf(CustomDLPR.class);
7993
}
8094

8195
@Configuration(proxyBeanMethods = false)
@@ -139,11 +153,29 @@ RetryTopicConfiguration retryTopicConfiguration1(KafkaTemplate<Integer, String>
139153
.create(template);
140154
}
141155

156+
@Override
157+
protected java.util.function.Consumer<DeadLetterPublishingRecovererFactory>
158+
configureDeadLetterPublishingContainerFactory() {
159+
160+
return (factory) -> factory.setDeadLetterPublisherCreator(
161+
(templateResolver, destinationResolver) ->
162+
new CustomDLPR(templateResolver, destinationResolver));
163+
}
164+
142165
@Bean
143166
TaskScheduler sched() {
144167
return new ThreadPoolTaskScheduler();
145168
}
146169

147170
}
148171

172+
static class CustomDLPR extends DeadLetterPublishingRecoverer {
173+
174+
CustomDLPR(Function<ProducerRecord<?, ?>, KafkaOperations<?, ?>> templateResolver,
175+
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
176+
super(templateResolver, destinationResolver);
177+
}
178+
179+
}
180+
149181
}

0 commit comments

Comments
 (0)