Skip to content

Commit 524f585

Browse files
authored
GH-1741: Add Message<?> sendAndReceive(Message<?>)
Resolves #1741 `ReplyingKafkaTemplate` enable send and receive with `Message<?>`. - also fix ToC level for new chapter - also polishing some tests for left over state * Revert RetryTopicConfigurerTests * Polish Kotlin Snippets; fix Javadoc. * Doc Polishing.
1 parent 24d1601 commit 524f585

File tree

21 files changed

+688
-64
lines changed

21 files changed

+688
-64
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ project ('spring-kafka-docs') {
356356
dependencies {
357357
api "org.springframework.boot:spring-boot-starter:$springBootVersion"
358358
api project (':spring-kafka')
359+
optionalApi 'com.fasterxml.jackson.core:jackson-core'
360+
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
359361
}
360362

361363
compileKotlin {

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,8 @@ RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
545545
----
546546
====
547547

548+
(Also see <<exchanging-messages>>).
549+
548550
The result is a `ListenableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
549551
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
550552
You can use this future to determine the result of the send operation.
@@ -762,6 +764,57 @@ These header names are used by the `@KafkaListener` infrastructure to route the
762764
Starting with version 2.3, you can customize the header names - the template has 3 properties `correlationHeaderName`, `replyTopicHeaderName`, and `replyPartitionHeaderName`.
763765
This is useful if your server is not a Spring application (or does not use the `@KafkaListener`).
764766

767+
[[exchanging-messages]]
768+
====== Request/Reply with `Message<?>` s
769+
770+
Version 2.7 added methods to the `ReplyingKafkaTemplate` to send and receive `spring-messaging` 's `Message<?>` abstraction:
771+
772+
====
773+
[source, java]
774+
----
775+
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
776+
777+
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
778+
ParameterizedTypeReference<P> returnType);
779+
----
780+
====
781+
782+
These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.
783+
784+
Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.
785+
786+
Use the second method if you need to provide type information for the return type, to assist the message converter.
787+
This also allows the same template to receive different types, even if there is no type metadata in the replies, such as when the server side is not a Spring application.
788+
The following is an example of the latter:
789+
790+
.Template Bean
791+
====
792+
[source, java, role="primary", indent=0]
793+
.Java
794+
----
795+
include::{java-examples}/requestreply/Application.java[tag=beans]
796+
----
797+
[source, kotlin, role="secondary",indent=0]
798+
.Kotlin
799+
----
800+
include::{kotlin-examples}/requestreply/Application.kt[tag=beans]
801+
----
802+
====
803+
804+
.Using the template
805+
====
806+
[source, java, role="primary", indent=0]
807+
.Java
808+
----
809+
include::{java-examples}/requestreply/Application.java[tag=sendReceive]
810+
----
811+
[source, kotlin, role="secondary", indent=0]
812+
.Kotlin
813+
----
814+
include::{kotlin-examples}/requestreply/Application.kt[tag=sendReceive]
815+
----
816+
====
817+
765818
[[reply-message]]
766819
===== Reply Type Message<?>
767820

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
[[retry-topic]]
2-
== Non-Blocking Retries
2+
=== Non-Blocking Retries
33

44
IMPORTANT: This is an experimental feature and the usual rule of no breaking API changes does not apply to this feature until the experimental designation is removed.
55
Users are encouraged to try out the feature and provide feedback via GitHub Issues or GitHub discussions.
66

77
Achieving non-blocking retry / dlt functionality with Kafka usually requires setting up extra topics and creating and configuring the corresponding listeners.
88
Since 2.7 Spring for Apache Kafka offers support for that via the `@RetryableTopic` annotation and `RetryTopicConfiguration` class to simplify that bootstrapping.
99

10-
=== How The Pattern Works
10+
==== How The Pattern Works
1111

1212
If message processing fails, the message is forwarded to a retry topic with a back off timestamp.
1313
The retry topic consumer then checks the timestamp and if it's not due it pauses the consumption for that topic's partition.
@@ -23,9 +23,9 @@ IMPORTANT: You can set the `AckMode` mode you prefer, but `RECORD` is suggested.
2323

2424
IMPORTANT: At this time this functionality doesn't support class level `@KafkaListener` annotations
2525

26-
=== Back Off Delay Precision
26+
==== Back Off Delay Precision
2727

28-
==== Overview and Guarantees
28+
===== Overview and Guarantees
2929

3030
All message processing and backing off is handled by the consumer thread, and, as such, delay precision is guaranteed on a best-effort basis.
3131
If one message's processing takes longer than the next message's back off period for that consumer, the next message's delay will be higher than expected.
@@ -36,7 +36,7 @@ That being said, for consumers handling a single partition the message's process
3636

3737
IMPORTANT: It is guaranteed that a message will never be processed before its due time.
3838

39-
==== Tuning the Delay Precision
39+
===== Tuning the Delay Precision
4040

4141
The message's processing delay precision relies on two `ContainerProperties`: `ContainerProperties.pollTimeout` and `ContainerProperties.idlePartitionEventInterval`.
4242
Both properties will be automatically set in the retry topic and dlt's `ListenerContainerFactory` to one quarter of the smallest delay value for that topic, with a minimum value of 250ms and a maximum value of 5000ms.
@@ -45,9 +45,9 @@ This way you can tune the precision and performance for the retry topics if you
4545

4646
NOTE: You can have separate `ListenerContainerFactory` instances for the main and retry topics - this way you can have different settings to better suit your needs, such as having a higher polling timeout setting for the main topics and a lower one for the retry topics.
4747

48-
=== Configuration
48+
==== Configuration
4949

50-
==== Using the `@RetryableTopic` annotation
50+
===== Using the `@RetryableTopic` annotation
5151

5252
To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring Kafka will bootstrap all the necessary topics and consumers with the default configurations.
5353

@@ -78,7 +78,7 @@ public void processMessage(MyPojo message) {
7878
NOTE: If you don't specify a kafkaTemplate name a bean with name `retryTopicDefaultKafkaTemplate` will be looked up.
7979
If no bean is found an exception is thrown.
8080

81-
==== Using `RetryTopicConfiguration` beans
81+
===== Using `RetryTopicConfiguration` beans
8282

8383
You can also configure the non-blocking retry support by creating `RetryTopicConfiguration` beans in a `@Configuration` annotated class.
8484

@@ -126,11 +126,11 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPo
126126

127127
NOTE: The retry topics' and dlt's consumers will be assigned to a consumer group with a group id that is the combination of the one with you provide in the `groupId` parameter of the `@KafkaListener` annotation with the topic's suffix. If you don't provide any they'll all belong to the same group, and rebalance on a retry topic will cause an unnecessary rebalance on the main topic.
128128

129-
=== Features
129+
==== Features
130130

131131
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.
132132

133-
==== BackOff Configuration
133+
===== BackOff Configuration
134134

135135
The BackOff configuration relies on the `BackOffPolicy` interface from the `Spring Retry` project.
136136

@@ -187,7 +187,7 @@ NOTE: The default backoff policy is FixedBackOffPolicy with a maximum of 3 attem
187187

188188
IMPORTANT: The first attempt counts against the maxAttempts, so if you provide a maxAttempts value of 4 there'll be the original attempt plus 3 retries.
189189

190-
==== Single Topic Fixed Delay Retries
190+
===== Single Topic Fixed Delay Retries
191191

192192
If you're using fixed delay policies such as `FixedBackOffPolicy` or `NoBackOffPolicy` you can use a single topic to accomplish the non-blocking retries.
193193
This topic will be suffixed with the provided or default suffix, and will not have either the index or the delay values appended.
@@ -220,7 +220,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
220220

221221
NOTE: The default behavior is creating separate retry topics for each attempt, appended with their index value: retry-0, retry-1, ...
222222

223-
==== Global timeout
223+
===== Global timeout
224224

225225
You can set the global timeout for the retrying process.
226226
If that time is reached, the next time the consumer throws an exception the message goes straight to the DLT, or just ends the processing if no DLT is available.
@@ -252,7 +252,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
252252

253253
NOTE: The default is having no timeout set, which can also be achieved by providing -1 as the timout value.
254254

255-
==== Exception Classifier
255+
===== Exception Classifier
256256

257257
You can specify which exceptions you want to retry on and which not to.
258258
You can also set it to traverse the causes to lookup nested exceptions.
@@ -284,7 +284,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t
284284

285285
NOTE: The default behavior is retrying on all exceptions and not traversing causes.
286286

287-
==== Include and Exclude Topics
287+
===== Include and Exclude Topics
288288

289289
You can decide which topics will and will not be handled by a `RetryTopicConfiguration` bean via the .includeTopic(String topic), .includeTopics(Collection<String> topics) .excludeTopic(String topic) and .excludeTopics(Collection<String> topics) methods.
290290

@@ -312,7 +312,7 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
312312
NOTE: The default behavior is to include all topics.
313313

314314

315-
==== Topics AutoCreation
315+
===== Topics AutoCreation
316316

317317
Unless otherwise specified the framework will auto create the required topics using `NewTopic` beans that are consumed by the `KafkaAdmin` bean.
318318
You can specify the number of partitions and the replication factor with which the topics will be created, and you can turn this feature off.
@@ -357,7 +357,7 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
357357
NOTE: By default the topics are autocreated with one partition and a replication factor of one.
358358

359359

360-
=== Topic Naming
360+
==== Topic Naming
361361

362362
Retry topics and DLT are named by suffixing the main topic with a provided or default value, appended by either the delay or index for that topic.
363363

@@ -367,7 +367,7 @@ Examples:
367367

368368
"my-other-topic" -> "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", ..., "my-topic-myDltSuffix".
369369

370-
==== Retry Topics and Dlt Suffixes
370+
===== Retry Topics and Dlt Suffixes
371371

372372
You can specify the suffixes that will be used by the retry and dlt topics.
373373

@@ -398,7 +398,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> t
398398

399399
NOTE: The default suffixes are "-retry" and "-dlt", for retry topics and dlt respectively.
400400

401-
==== Appending the Topic's Index or Delay
401+
===== Appending the Topic's Index or Delay
402402

403403
You can either append the topic's index or delay values after the suffix.
404404

@@ -428,11 +428,11 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> templa
428428

429429
NOTE: The default behavior is to suffix with the delay values, except for fixed delay configurations with multiple topics, in which case the topics are suffixed with the topic's index.
430430

431-
=== Dlt Strategies
431+
==== Dlt Strategies
432432

433433
The framework provides a few strategies for working with DLTs. You can provide a method for DLT processing, use the default logging method, or have no DLT at all. Also you can choose what happens if DLT processing fails.
434434

435-
==== Dlt Processing Method
435+
===== Dlt Processing Method
436436

437437
You can specify the method used to process the Dlt for the topic, as well as the behavior if that processing fails.
438438

@@ -487,7 +487,7 @@ public class MyCustomDltProcessor {
487487

488488
NOTE: If no DLT handler is provided, the default RetryTopicConfigurer.LoggingDltListenerHandlerMethod is used.
489489

490-
==== Dlt Failure Behavior
490+
===== Dlt Failure Behavior
491491

492492
Should the Dlt processing fail, there are two possible behaviors available: `ALWAYS_RETRY_ON_ERROR` and `FAIL_ON_ERROR`.
493493

@@ -521,7 +521,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
521521

522522
NOTE: The default behavior is to `ALWAYS_RETRY_ON_ERROR`.
523523

524-
==== Configuring No Dlt
524+
===== Configuring No Dlt
525525

526526
The framework also provides the possibility of not configuring a DLT for the topic.
527527
In this case after retrials are exhausted the processing simply ends.
@@ -551,7 +551,7 @@ public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> templ
551551
====
552552

553553

554-
=== Specifying a ListenerContainerFactory
554+
==== Specifying a ListenerContainerFactory
555555

556556
By default the RetryTopic configuration will use the provided factory from the `@KafkaListener` annotation, but you can specify a different one to be used to create the retry topic and dlt listener containers.
557557

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ See <<transactions>> for more information.
5454
==== `ReplyingKafkaTemplate` Changes
5555

5656
There is now a mechanism to examine a reply and fail the future exceptionally if some condition exists.
57+
58+
Support for sending and receiving `spring-messaging` `Message<?>` s has been added.
59+
5760
See <<replying-template>> for more information.
5861

5962
[[x27-streams]]

0 commit comments

Comments
 (0)