Skip to content

Commit b9599a8

Browse files
authored
Add getEndOffsets() to KafkaTestUtils
**cherry-pick to 2.6.x** * Docs and minor polishing.
1 parent 043090d commit b9599a8

File tree

3 files changed

+55
-4
lines changed

3 files changed

+55
-4
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/utils/KafkaTestUtils.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.time.Duration;
2020
import java.util.ArrayList;
21+
import java.util.Arrays;
22+
import java.util.Collection;
2123
import java.util.Collections;
2224
import java.util.HashMap;
2325
import java.util.Iterator;
@@ -36,6 +38,7 @@
3638
import org.apache.kafka.clients.consumer.KafkaConsumer;
3739
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3840
import org.apache.kafka.clients.producer.ProducerConfig;
41+
import org.apache.kafka.common.PartitionInfo;
3942
import org.apache.kafka.common.TopicPartition;
4043
import org.apache.kafka.common.serialization.IntegerDeserializer;
4144
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -239,6 +242,39 @@ public static OffsetAndMetadata getCurrentOffset(String brokerAddresses, String
239242
}
240243
}
241244

245+
/**
246+
* Return the end offsets of the requested topic/partitions
247+
* @param <K> the key type.
248+
* @param <V> the value type.
249+
* @param consumer the consumer.
250+
* @param topic the topic.
251+
* @param partitions the partitions, or null for all partitions.
252+
* @return the map of end offsets.
253+
* @since 2.6.5
254+
* @see Consumer#endOffsets(Collection, Duration)
255+
*/
256+
public static Map<TopicPartition, Long> getEndOffsets(Consumer<?, ?> consumer, String topic,
257+
Integer... partitions) {
258+
259+
Collection<TopicPartition> tps;
260+
if (partitions == null || partitions.length == 0) {
261+
Map<String, List<PartitionInfo>> parts = consumer.listTopics(Duration.ofSeconds(10));
262+
tps = parts.entrySet()
263+
.stream()
264+
.filter(entry -> entry.getKey().equals(topic))
265+
.flatMap(entry -> entry.getValue().stream())
266+
.map(pi -> new TopicPartition(topic, pi.partition()))
267+
.collect(Collectors.toList());
268+
}
269+
else {
270+
Assert.noNullElements(partitions, "'partitions' cannot have null elements");
271+
tps = Arrays.stream(partitions)
272+
.map(part -> new TopicPartition(topic, part))
273+
.collect(Collectors.toList());
274+
}
275+
return consumer.endOffsets(tps, Duration.ofSeconds(10));
276+
}
277+
242278
/**
243279
* Poll the consumer for records.
244280
* @param consumer the consumer.

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.kafka.clients.consumer.KafkaConsumer;
2727
import org.apache.kafka.clients.producer.KafkaProducer;
2828
import org.apache.kafka.clients.producer.ProducerRecord;
29+
import org.apache.kafka.common.TopicPartition;
2930
import org.junit.jupiter.api.Test;
3031

3132
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -44,22 +45,26 @@ public class KafkaTestUtilsTests {
4445
void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
4546
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
4647
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
47-
producer.send(new ProducerRecord<>("singleTopic1", 1, "foo"));
48-
producer.send(new ProducerRecord<>("singleTopic2", 1, "foo"));
48+
producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo"));
49+
producer.send(new ProducerRecord<>("singleTopic2", 0, 1, "foo"));
4950
producer.close();
5051
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests1", "false", broker);
5152
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
5253
broker.consumeFromAllEmbeddedTopics(consumer);
5354
KafkaTestUtils.getSingleRecord(consumer, "singleTopic1");
5455
KafkaTestUtils.getSingleRecord(consumer, "singleTopic2");
56+
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getEndOffsets(consumer, "singleTopic1");
57+
assertThat(endOffsets).hasSize(2);
58+
assertThat(endOffsets.get(new TopicPartition("singleTopic1", 0))).isEqualTo(1L);
59+
assertThat(endOffsets.get(new TopicPartition("singleTopic1", 1))).isEqualTo(0L);
5560
consumer.close();
5661
}
5762

5863
@Test
5964
void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) {
6065
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
6166
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
62-
producer.send(new ProducerRecord<>("singleTopic4", 1, "foo"));
67+
producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo"));
6368
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("ktuTests2", "false", broker);
6469
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
6570
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
@@ -71,6 +76,10 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
7176
producer.close();
7277
KafkaTestUtils.getSingleRecord(consumer, "singleTopic4");
7378
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5");
79+
Map<TopicPartition, Long> endOffsets = KafkaTestUtils.getEndOffsets(consumer, "singleTopic4", 0, 1);
80+
assertThat(endOffsets).hasSize(2);
81+
assertThat(endOffsets.get(new TopicPartition("singleTopic4", 0))).isEqualTo(1L);
82+
assertThat(endOffsets.get(new TopicPartition("singleTopic4", 1))).isEqualTo(0L);
7483
consumer.close();
7584
}
7685

src/reference/asciidoc/testing.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,16 @@
33

44
The `spring-kafka-test` jar contains some useful utilities to assist with testing your applications.
55

6+
[[ktu]]
7+
==== KafkaTestUtils
8+
9+
`o.s.kafka.test.utils.KafkaTestUtils` provides a number of static helper methods to consume records, retrieve various record offsets, and others.
10+
Refer to its https://docs.spring.io/spring-kafka/docs/current/api/org/springframework/kafka/test/utils/KafkaTestUtils.html[Javadocs] for complete details.
11+
612
[[junit]]
713
==== JUnit
814

9-
`o.s.kafka.test.utils.KafkaTestUtils` provides some static methods to set up producer and consumer properties.
15+
`o.s.kafka.test.utils.KafkaTestUtils` also provides some static methods to set up producer and consumer properties.
1016
The following listing shows those method signatures:
1117

1218
====

0 commit comments

Comments
 (0)