Skip to content

Commit ed6673e

Browse files
authored
GH-2357: Switch to CompletableFuture
Resolves #2357 Spring Framework is planning to deprecate `ListenableFuture` in 6.0. Add methods to the `KafkaOperations` (`KafkaTemplate`) that return `CompletableFuture` instead; the `ListenableFuture` methods have now been removed in 3.0. * Remove KafkaOperations2; doc polishing. * Fix import.
1 parent a9ec919 commit ed6673e

20 files changed

+502
-235
lines changed

spring-kafka-docs/src/main/asciidoc/changes-since-1.0.adoc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,22 @@ You can now configure which inbound headers should be mapped.
3333
Also available in version 2.8.8 or later.
3434
See <<headers>> for more information.
3535

36+
[[x29-template-changes]]
37+
==== `KafkaTemplate` Changes
38+
39+
In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
40+
See <<kafka-template>> for assistance in transitioning when using this release.
41+
3642
[[x29-rkt-changes]]
3743
==== `ReplyingKafkaTemplate` Changes
3844

3945
The template now provides a method to wait for assignment on the reply container, to avoid a race when sending a request before the reply container is initialized.
4046
Also available in version 2.8.8 or later.
4147
See <<replying-template>>.
4248

49+
In 3.0, the futures returned by this class will be `CompletableFuture` s instead of `ListenableFuture` s.
50+
See <<replying-template>> and <<exchanging-messages>> for assistance in transitioning when using this release.
51+
4352
=== What's New in 2.8 Since 2.7
4453

4554
This section covers the changes made from version 2.7 to version 2.8.

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 30 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -170,25 +170,25 @@ The following listing shows the relevant methods from `KafkaTemplate`:
170170
====
171171
[source, java]
172172
----
173-
ListenableFuture<SendResult<K, V>> sendDefault(V data);
173+
CompletableFuture<SendResult<K, V>> sendDefault(V data);
174174
175-
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
175+
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
176176
177-
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
177+
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
178178
179-
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
179+
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
180180
181-
ListenableFuture<SendResult<K, V>> send(String topic, V data);
181+
CompletableFuture<SendResult<K, V>> send(String topic, V data);
182182
183-
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
183+
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
184184
185-
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
185+
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
186186
187-
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
187+
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
188188
189-
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
189+
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
190190
191-
ListenableFuture<SendResult<K, V>> send(Message<?> message);
191+
CompletableFuture<SendResult<K, V>> send(Message<?> message);
192192
193193
Map<MetricName, ? extends Metric> metrics();
194194
@@ -210,6 +210,9 @@ interface ProducerCallback<K, V, T> {
210210

211211
See the https://docs.spring.io/spring-kafka/api/org/springframework/kafka/core/KafkaTemplate.html[Javadoc] for more detail.
212212

213+
IMPORTANT: In version 3.0, the methods that previously returned `ListenableFuture` have been changed to return `CompletableFuture`.
214+
To facilitate the migration, the 2.9 version added a method `.usingCompletableFuture()` which provided the same methods with `CompletableFuture` return types; this method is no longer available.
215+
213216
The `sendDefault` API requires that a default topic has been provided to the template.
214217

215218
The API takes in a `timestamp` as a parameter and stores this timestamp in the record.
@@ -302,75 +305,27 @@ By default, the template is configured with a `LoggingProducerListener`, which l
302305

303306
For convenience, default method implementations are provided in case you want to implement only one of the methods.
304307

305-
Notice that the send methods return a `ListenableFuture<SendResult>`.
308+
Notice that the send methods return a `CompletableFuture<SendResult>`.
306309
You can register a callback with the listener to receive the result of the send asynchronously.
307310
The following example shows how to do so:
308311

309312
====
310313
[source, java]
311314
----
312-
ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
313-
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
314-
315-
@Override
316-
public void onSuccess(SendResult<Integer, String> result) {
317-
...
318-
}
319-
320-
@Override
321-
public void onFailure(Throwable ex) {
322-
...
323-
}
324-
315+
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
316+
future.whenComplete((result, ex) -> {
317+
...
325318
});
326319
----
327320
====
328321

329322
`SendResult` has two properties, a `ProducerRecord` and `RecordMetadata`.
330323
See the Kafka API documentation for information about those objects.
331324

332-
The `Throwable` in `onFailure` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record.
333-
334-
Starting with version 2.5, you can use a `KafkaSendCallback` instead of a `ListenableFutureCallback`, making it easier to extract the failed `ProducerRecord`, avoiding the need to cast the `Throwable`:
335-
336-
====
337-
[source, java]
338-
----
339-
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
340-
future.addCallback(new KafkaSendCallback<Integer, String>() {
341-
342-
@Override
343-
public void onSuccess(SendResult<Integer, String> result) {
344-
...
345-
}
346-
347-
@Override
348-
public void onFailure(KafkaProducerException ex) {
349-
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
350-
...
351-
}
352-
353-
});
354-
----
355-
====
356-
357-
You can also use a pair of lambdas:
358-
359-
====
360-
[source, java]
361-
----
362-
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
363-
future.addCallback(result -> {
364-
...
365-
}, (KafkaFailureCallback<Integer, String>) ex -> {
366-
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
367-
...
368-
});
369-
----
370-
====
325+
The `Throwable` can be cast to a `KafkaProducerException`; its `failedProducerRecord` property contains the failed record.
371326

372327
If you wish to block the sending thread to await the result, you can invoke the future's `get()` method; using the method with a timeout is recommended.
373-
You may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send.
328+
If you have set a `linger.ms`, you may wish to invoke `flush()` before waiting or, for convenience, the template has a constructor with an `autoFlush` parameter that causes the template to `flush()` on each send.
374329
Flushing is only needed if you have set the `linger.ms` producer property and want to immediately send a partial batch.
375330

376331
====== Examples
@@ -384,19 +339,14 @@ This section shows examples of sending messages to Kafka:
384339
public void sendToKafka(final MyOutputData data) {
385340
final ProducerRecord<String, String> record = createRecord(data);
386341
387-
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
388-
future.addCallback(new KafkaSendCallback<Integer, String>() {
389-
390-
@Override
391-
public void onSuccess(SendResult<Integer, String> result) {
342+
CompletableFuture<SendResult<Integer, String>> future = template.send(record);
343+
future.whenComplete((result, ex) -> {
344+
if (ex == null) {
392345
handleSuccess(data);
393346
}
394-
395-
@Override
396-
public void onFailure(KafkaProducerException ex) {
347+
else {
397348
handleFailure(data, record, ex);
398349
}
399-
400350
});
401351
}
402352
----
@@ -549,10 +499,12 @@ RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
549499

550500
(Also see <<exchanging-messages>>).
551501

552-
The result is a `ListenableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
502+
The result is a `CompletableFuture` that is asynchronously populated with the result (or an exception, for a timeout).
553503
The result also has a `sendFuture` property, which is the result of calling `KafkaTemplate.send()`.
554504
You can use this future to determine the result of the send operation.
555505

506+
IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s.
507+
556508
If the first method is used, or the `replyTimeout` argument is `null`, the template's `defaultReplyTimeout` property is used (5 seconds by default).
557509

558510
Starting with version 2.8.8, the template has a new method `waitForAssignment`.
@@ -791,6 +743,8 @@ RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
791743

792744
These will use the template's default `replyTimeout`, there are also overloaded versions that can take a timeout in the method call.
793745

746+
IMPORTANT: In version 3.0, the futures returned by these methods (and their `sendFuture` properties) have been changed to `CompletableFuture` s instead of `ListenableFuture` s.
747+
794748
Use the first method if the consumer's `Deserializer` or the template's `MessageConverter` can convert the payload without any additional information, either via configuration or type metadata in the reply message.
795749

796750
Use the second method if you need to provide type information for the return type, to assist the message converter.
@@ -2236,6 +2190,7 @@ public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerCon
22362190
====
22372191

22382192
When you use `@SendTo`, you must configure the `ConcurrentKafkaListenerContainerFactory` with a `KafkaTemplate` in its `replyTemplate` property to perform the send.
2193+
Spring Boot will automatically wire in its auto configured template (or any if a single instance is present).
22392194

22402195
NOTE: Unless you use <<replying-template,request/reply semantics>> only the simple `send(topic, value)` method is used, so you may wish to create a subclass to generate the partition or key.
22412196
The following example shows how to do so:
@@ -2248,7 +2203,7 @@ public KafkaTemplate<String, String> myReplyingTemplate() {
22482203
return new KafkaTemplate<Integer, String>(producerFactory()) {
22492204
22502205
@Override
2251-
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
2206+
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
22522207
return super.send(topic, partitionForData(data), keyForData(data), data);
22532208
}
22542209

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,15 @@ See <<retry-config>> for more information.
3434

3535
Events related to consumer authentication and authorization failures are now published by the container.
3636
See <<events>> for more information.
37+
38+
[[x30-template-changes]]
39+
==== `KafkaTemplate` Changes
40+
41+
The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
42+
See <<kafka-template>>.
43+
44+
[[x30-rkt-changes]]
45+
==== `ReplyingKafkaTemplate` Changes
46+
47+
The futures returned by this class are now `CompletableFuture` s instead of `ListenableFuture` s.
48+
See <<replying-template>> and <<exchanging-messages>>.

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Collection;
2121
import java.util.List;
2222
import java.util.Map;
23+
import java.util.concurrent.CompletableFuture;
2324

2425
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
2526
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -36,10 +37,9 @@
3637
import org.springframework.kafka.support.TopicPartitionOffset;
3738
import org.springframework.lang.Nullable;
3839
import org.springframework.messaging.Message;
39-
import org.springframework.util.concurrent.ListenableFuture;
4040

4141
/**
42-
* The basic Kafka operations contract returning {@link ListenableFuture}s.
42+
* The basic Kafka operations contract returning {@link CompletableFuture}s.
4343
*
4444
* @param <K> the key type.
4545
* @param <V> the value type.
@@ -68,15 +68,15 @@ public interface KafkaOperations<K, V> {
6868
* @param data The data.
6969
* @return a Future for the {@link SendResult}.
7070
*/
71-
ListenableFuture<SendResult<K, V>> sendDefault(V data);
71+
CompletableFuture<SendResult<K, V>> sendDefault(V data);
7272

7373
/**
7474
* Send the data to the default topic with the provided key and no partition.
7575
* @param key the key.
7676
* @param data The data.
7777
* @return a Future for the {@link SendResult}.
7878
*/
79-
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
79+
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
8080

8181
/**
8282
* Send the data to the default topic with the provided key and partition.
@@ -85,7 +85,7 @@ public interface KafkaOperations<K, V> {
8585
* @param data the data.
8686
* @return a Future for the {@link SendResult}.
8787
*/
88-
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
88+
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
8989

9090
/**
9191
* Send the data to the default topic with the provided key and partition.
@@ -96,15 +96,15 @@ public interface KafkaOperations<K, V> {
9696
* @return a Future for the {@link SendResult}.
9797
* @since 1.3
9898
*/
99-
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
99+
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
100100

101101
/**
102102
* Send the data to the provided topic with no key or partition.
103103
* @param topic the topic.
104104
* @param data The data.
105105
* @return a Future for the {@link SendResult}.
106106
*/
107-
ListenableFuture<SendResult<K, V>> send(String topic, V data);
107+
CompletableFuture<SendResult<K, V>> send(String topic, V data);
108108

109109
/**
110110
* Send the data to the provided topic with the provided key and no partition.
@@ -113,7 +113,7 @@ public interface KafkaOperations<K, V> {
113113
* @param data The data.
114114
* @return a Future for the {@link SendResult}.
115115
*/
116-
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
116+
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
117117

118118
/**
119119
* Send the data to the provided topic with the provided key and partition.
@@ -123,7 +123,7 @@ public interface KafkaOperations<K, V> {
123123
* @param data the data.
124124
* @return a Future for the {@link SendResult}.
125125
*/
126-
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
126+
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
127127

128128
/**
129129
* Send the data to the provided topic with the provided key and partition.
@@ -135,15 +135,15 @@ public interface KafkaOperations<K, V> {
135135
* @return a Future for the {@link SendResult}.
136136
* @since 1.3
137137
*/
138-
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
138+
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
139139

140140
/**
141141
* Send the provided {@link ProducerRecord}.
142142
* @param record the record.
143143
* @return a Future for the {@link SendResult}.
144144
* @since 1.3
145145
*/
146-
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
146+
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
147147

148148
/**
149149
* Send a message with routing information in message headers. The message payload
@@ -154,7 +154,7 @@ public interface KafkaOperations<K, V> {
154154
* @see org.springframework.kafka.support.KafkaHeaders#PARTITION
155155
* @see org.springframework.kafka.support.KafkaHeaders#KEY
156156
*/
157-
ListenableFuture<SendResult<K, V>> send(Message<?> message);
157+
CompletableFuture<SendResult<K, V>> send(Message<?> message);
158158

159159
/**
160160
* See {@link Producer#partitionsFor(String)}.

0 commit comments

Comments
 (0)