Skip to content

Commit 8cffe22

Browse files
authored
GH-2978: EmbeddedKafkaKRaft partition propagation
Fixes: #2978 If we don't create topics manually, that can be done automatically on the broker side according to its configuration. For that goal the `EmbeddedKafkaKraftBroker` is missing to populate `KafkaConfig.NumPartitionsProp(): "" + this.partitionsPerTopic` broker property from `@EmbeddedKafka` configuration * Propagate `partitionsPerTopic` option down to the embedded broker(s) in the `EmbeddedKafkaKraftBroker` * Some other simple refactoring in the `EmbeddedKafkaKraftBroker` * Verify the option propagated via new unit test in the `KafkaTestUtilsTests.topicAutomaticallyCreatedWithProperNumberOfPartitions()`
1 parent 359ce30 commit 8cffe22

File tree

2 files changed

+42
-17
lines changed

2 files changed

+42
-17
lines changed

spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -199,7 +199,7 @@ public void setAdminTimeout(int adminTimeout) {
199199
public void afterPropertiesSet() {
200200
if (this.initialized.compareAndSet(false, true)) {
201201
overrideExitMethods();
202-
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
202+
addDefaultBrokerPropsIfAbsent();
203203
start();
204204
}
205205
}
@@ -252,10 +252,11 @@ public void destroy() {
252252
this.cluster = null;
253253
}
254254

255-
private void addDefaultBrokerPropsIfAbsent(Properties brokerConfig, int numBrokers) {
256-
brokerConfig.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true");
257-
brokerConfig.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
258-
brokerConfig.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(numBrokers));
255+
private void addDefaultBrokerPropsIfAbsent() {
256+
this.brokerProperties.putIfAbsent(KafkaConfig.DeleteTopicEnableProp(), "true");
257+
this.brokerProperties.putIfAbsent(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
258+
this.brokerProperties.putIfAbsent(KafkaConfig.OffsetsTopicReplicationFactorProp(), "" + this.count);
259+
this.brokerProperties.putIfAbsent(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
259260
}
260261

261262
private void logDir(Properties brokerConfigProperties) {

spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019-2022 the original author or authors.
2+
* Copyright 2019-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,32 +20,36 @@
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121

2222
import java.time.Duration;
23+
import java.util.List;
2324
import java.util.Map;
2425

2526
import org.apache.kafka.clients.admin.AdminClient;
2627
import org.apache.kafka.clients.consumer.ConsumerConfig;
2728
import org.apache.kafka.clients.consumer.ConsumerRecord;
2829
import org.apache.kafka.clients.consumer.ConsumerRecords;
2930
import org.apache.kafka.clients.consumer.KafkaConsumer;
31+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
3032
import org.apache.kafka.clients.producer.KafkaProducer;
3133
import org.apache.kafka.clients.producer.ProducerRecord;
3234
import org.apache.kafka.common.TopicPartition;
35+
import org.apache.kafka.common.TopicPartitionInfo;
3336
import org.junit.jupiter.api.Test;
3437

3538
import org.springframework.kafka.test.EmbeddedKafkaBroker;
3639
import org.springframework.kafka.test.context.EmbeddedKafka;
3740

3841
/**
3942
* @author Gary Russell
43+
* @author Artem Bilan
4044
* @since 2.2.7
4145
*
4246
*/
43-
@EmbeddedKafka(topics = { "singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5",
44-
"multiTopic1" })
47+
@EmbeddedKafka(topics = {"singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5",
48+
"multiTopic1"})
4549
public class KafkaTestUtilsTests {
4650

4751
@Test
48-
void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
52+
void testGetSingleWithMoreThanOneTopic(EmbeddedKafkaBroker broker) {
4953
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
5054
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
5155
producer.send(new ProducerRecord<>("singleTopic1", 0, 1, "foo"));
@@ -64,7 +68,7 @@ void testGetSingleWithMoreThatOneTopic(EmbeddedKafkaBroker broker) {
6468
}
6569

6670
@Test
67-
void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) {
71+
void testGetSingleWithMoreThanOneTopicRecordNotThereYet(EmbeddedKafkaBroker broker) {
6872
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
6973
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
7074
producer.send(new ProducerRecord<>("singleTopic4", 0, 1, "foo"));
@@ -73,7 +77,7 @@ void testGetSingleWithMoreThatOneTopicRecordNotThereYet(EmbeddedKafkaBroker brok
7377
broker.consumeFromEmbeddedTopics(consumer, "singleTopic4", "singleTopic5");
7478
long t1 = System.currentTimeMillis();
7579
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() ->
76-
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2)));
80+
KafkaTestUtils.getSingleRecord(consumer, "singleTopic5", Duration.ofSeconds(2)));
7781
assertThat(System.currentTimeMillis() - t1).isGreaterThanOrEqualTo(2000L);
7882
producer.send(new ProducerRecord<>("singleTopic5", 1, "foo"));
7983
producer.close();
@@ -97,19 +101,19 @@ public void testGetOneRecord(EmbeddedKafkaBroker broker) throws Exception {
97101
assertThat(oneRecord.value()).isEqualTo("foo");
98102
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
99103
.isNotNull()
100-
.extracting(omd -> omd.offset())
104+
.extracting(OffsetAndMetadata::offset)
101105
.isEqualTo(1L);
102106
oneRecord = KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "getOne",
103107
"singleTopic3", 0, true, true, Duration.ofSeconds(10));
104108
assertThat(oneRecord.value()).isEqualTo("foo");
105109
assertThat(KafkaTestUtils.getCurrentOffset(broker.getBrokersAsString(), "getOne", "singleTopic3", 0))
106110
.isNotNull()
107-
.extracting(omd -> omd.offset())
111+
.extracting(OffsetAndMetadata::offset)
108112
.isEqualTo(1L);
109113
}
110114

111115
@Test
112-
public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
116+
public void testMultiMinRecords(EmbeddedKafkaBroker broker) {
113117
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
114118
KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps);
115119
producer.send(new ProducerRecord<>("multiTopic1", 0, 1, "foo"));
@@ -135,16 +139,36 @@ public void testMultiMinRecords(EmbeddedKafkaBroker broker) throws Exception {
135139
public void testGetCurrentOffsetWithAdminClient(EmbeddedKafkaBroker broker) throws Exception {
136140
Map<String, Object> adminClientProps = Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
137141
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
138-
try (AdminClient adminClient = AdminClient.create(adminClientProps); KafkaProducer<Integer, String> producer = new KafkaProducer<>(producerProps)) {
142+
try (var adminClient = AdminClient.create(adminClientProps); var producer = new KafkaProducer<>(producerProps)) {
139143
producer.send(new ProducerRecord<>("singleTopic3", 0, 1, "foo"));
140144

141145
KafkaTestUtils.getOneRecord(broker.getBrokersAsString(), "testGetCurrentOffsetWithAdminClient",
142146
"singleTopic3", 0, false, true, Duration.ofSeconds(10));
143147
assertThat(KafkaTestUtils.getCurrentOffset(adminClient, "testGetCurrentOffsetWithAdminClient", "singleTopic3", 0))
144148
.isNotNull()
145-
.extracting(omd -> omd.offset())
149+
.extracting(OffsetAndMetadata::offset)
146150
.isEqualTo(1L);
147151
}
152+
}
153+
154+
@Test
155+
public void topicAutomaticallyCreatedWithProperNumberOfPartitions(EmbeddedKafkaBroker broker) throws Exception {
156+
Map<String, Object> producerProps = KafkaTestUtils.producerProps(broker);
157+
158+
Map<String, Object> adminClientProps =
159+
Map.of(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker.getBrokersAsString());
160+
try (var adminClient = AdminClient.create(adminClientProps); var producer = new KafkaProducer<>(producerProps)) {
161+
producer.send(new ProducerRecord<>("auto-topic", "test data")).get();
162+
163+
List<TopicPartitionInfo> partitions =
164+
adminClient.describeTopics(List.of("auto-topic"))
165+
.allTopicNames()
166+
.get()
167+
.get("auto-topic")
168+
.partitions();
169+
170+
assertThat(partitions).hasSize(2);
171+
}
148172

149173
}
150174

0 commit comments

Comments
 (0)