Skip to content

Commit 3286f5f

Browse files
garyrussellartembilan
authored andcommitted
GH-1725: KafkaAdmin Improvements
Resolves #1725 - add `NewTopics` collection wrapper for auto topic creation - add `createOrModifyTopics()` - add `describeTopics()` - introduce `KafkaAdminOperations`
1 parent ca2cb94 commit 3286f5f

File tree

8 files changed

+241
-88
lines changed

8 files changed

+241
-88
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,23 @@ include::{kotlin-examples}/topics/Config.kt[tag=brokerProps]
105105
----
106106
====
107107

108-
IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` `@Bean` s.
108+
Starting with version 2.7, you can declare multiple `NewTopic` s in a single `KafkaAdmin.NewTopics` bean definition:
109+
110+
====
111+
[source, java, indent=0, role="primary"]
112+
.Java
113+
----
114+
include::{java-examples}/topics/Config.java[tag=newTopicsBean]
115+
----
116+
[source, kotlin, indent=0, role="secondary"]
117+
.Kotlin
118+
----
119+
include::{kotlin-examples}/topics/Config.kt[tag=newTopicsBean]
120+
----
121+
====
122+
123+
124+
IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) `@Bean` s.
109125

110126
By default, if the broker is not available, a message is logged, but the context continues to load.
111127
You can programmatically invoke the admin's `initialize()` method to try again later.
@@ -114,6 +130,11 @@ The context then fails to initialize.
114130

115131
NOTE: If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the `NewTopic.numPartitions`.
116132

133+
Starting with version 2.7, the `KafkaAdmin` provides methods to create and examine topics at runtime.
134+
135+
* `createOrModifyTopics`
136+
* `describeTopics`
137+
117138
For more advanced features, you can use the `AdminClient` directly.
118139
The following example shows how to do so:
119140

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,3 +61,10 @@ See <<replying-template>> for more information.
6161

6262
By default, the `StreamsBuilderFactoryBean` is now configured to not clean up local state.
6363
See <<streams-config>> for more information.
64+
65+
[[x27-admin]]
66+
==== `KafkaAdmin` Changes
67+
68+
New methods `createOrModifyTopics` and `describeTopics` have been added.
69+
`KafkaAdmin.NewTopics` has been added to facilitate configuring multiple topics in a single bean.
70+
See <<kafka-admin>> for more information.

spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/topics/Config.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.springframework.context.annotation.Bean;
2828
import org.springframework.kafka.config.TopicBuilder;
2929
import org.springframework.kafka.core.KafkaAdmin;
30+
import org.springframework.kafka.core.KafkaAdmin.NewTopics;
3031

3132
/**
3233
* Snippet for Configuring Topics section.
@@ -37,7 +38,7 @@
3738
*/
3839
public class Config {
3940

40-
// tag::topicBeans[]
41+
// tag::topicBeans[]
4142
@Bean
4243
public KafkaAdmin admin() {
4344
Map<String, Object> configs = new HashMap<>();
@@ -74,25 +75,39 @@ public NewTopic topic3() {
7475
}
7576
// end::topicBeans[]
7677
// tag::brokerProps[]
77-
@Bean
78-
public NewTopic topic4() {
79-
return TopicBuilder.name("defaultBoth")
80-
.build();
81-
}
78+
@Bean
79+
public NewTopic topic4() {
80+
return TopicBuilder.name("defaultBoth")
81+
.build();
82+
}
8283

83-
@Bean
84-
public NewTopic topic5() {
85-
return TopicBuilder.name("defaultPart")
86-
.replicas(1)
87-
.build();
88-
}
84+
@Bean
85+
public NewTopic topic5() {
86+
return TopicBuilder.name("defaultPart")
87+
.replicas(1)
88+
.build();
89+
}
8990

90-
@Bean
91-
public NewTopic topic6() {
92-
return TopicBuilder.name("defaultRepl")
93-
.partitions(3)
94-
.build();
95-
}
96-
// end::brokerProps[]
91+
@Bean
92+
public NewTopic topic6() {
93+
return TopicBuilder.name("defaultRepl")
94+
.partitions(3)
95+
.build();
96+
}
97+
// end::brokerProps[]
98+
// tag::newTopicsBean[]
99+
@Bean
100+
public KafkaAdmin.NewTopics topics456() {
101+
return new NewTopics(
102+
TopicBuilder.name("defaultBoth")
103+
.build(),
104+
TopicBuilder.name("defaultPart")
105+
.replicas(1)
106+
.build(),
107+
TopicBuilder.name("defaultRepl")
108+
.partitions(3)
109+
.build());
110+
}
111+
// end::newTopicsBean[]
97112

98113
}

spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/topics/Config.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,18 @@ class Config {
7171
@Bean
7272
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
7373
// end::brokerProps[]
74+
// tag::newTopicsBean[]
75+
@Bean
76+
fun topics456() = KafkaAdmin.NewTopics(
77+
TopicBuilder.name("defaultBoth")
78+
.build(),
79+
TopicBuilder.name("defaultPart")
80+
.replicas(1)
81+
.build(),
82+
TopicBuilder.name("defaultRepl")
83+
.partitions(3)
84+
.build()
85+
)
86+
// end::newTopicsBean[]
7487

7588
}

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 69 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2020 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.time.Duration;
2020
import java.util.ArrayList;
21+
import java.util.Arrays;
2122
import java.util.Collection;
2223
import java.util.Collections;
2324
import java.util.HashMap;
@@ -56,7 +57,8 @@
5657
*
5758
* @since 1.3
5859
*/
59-
public class KafkaAdmin extends KafkaResourceFactory implements ApplicationContextAware, SmartInitializingSingleton {
60+
public class KafkaAdmin extends KafkaResourceFactory
61+
implements ApplicationContextAware, SmartInitializingSingleton, KafkaAdminOperations {
6062

6163
/**
6264
* The default close timeout duration as 10 seconds.
@@ -129,11 +131,7 @@ public void setAutoCreate(boolean autoCreate) {
129131
this.autoCreate = autoCreate;
130132
}
131133

132-
133-
/**
134-
* Get an unmodifiable copy of this admin's configuration.
135-
* @return the configuration map.
136-
*/
134+
@Override
137135
public Map<String, Object> getConfigurationProperties() {
138136
Map<String, Object> configs2 = new HashMap<>(this.configs);
139137
checkBootstrap(configs2);
@@ -158,13 +156,14 @@ public void afterSingletonsInstantiated() {
158156
* @see #setAutoCreate(boolean)
159157
*/
160158
public final boolean initialize() {
161-
Collection<NewTopic> newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values();
159+
Collection<NewTopic> newTopics = new ArrayList<>(
160+
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());
161+
Collection<NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values();
162+
wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics()));
162163
if (newTopics.size() > 0) {
163164
AdminClient adminClient = null;
164165
try {
165-
Map<String, Object> configs2 = new HashMap<>(this.configs);
166-
checkBootstrap(configs2);
167-
adminClient = AdminClient.create(configs2);
166+
adminClient = createAdmin();
168167
}
169168
catch (Exception e) {
170169
if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
@@ -176,7 +175,7 @@ public final boolean initialize() {
176175
}
177176
if (adminClient != null) {
178177
try {
179-
addTopicsIfNeeded(adminClient, newTopics);
178+
addOrModifyTopicsIfNeeded(adminClient, newTopics);
180179
return true;
181180
}
182181
catch (Exception e) {
@@ -197,7 +196,39 @@ public final boolean initialize() {
197196
return false;
198197
}
199198

200-
private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
199+
@Override
200+
public void createOrModifyTopics(NewTopic... topics) {
201+
try (AdminClient client = createAdmin()) {
202+
addOrModifyTopicsIfNeeded(client, Arrays.asList(topics));
203+
}
204+
}
205+
206+
@Override
207+
public Map<String, TopicDescription> describeTopics(String... topicNames) {
208+
try (AdminClient admin = createAdmin()) {
209+
Map<String, TopicDescription> results = new HashMap<>();
210+
DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames));
211+
try {
212+
results.putAll(topics.all().get(this.operationTimeout, TimeUnit.SECONDS));
213+
return results;
214+
}
215+
catch (InterruptedException ie) {
216+
Thread.currentThread().interrupt();
217+
throw new KafkaException("Interrupted while getting topic descriptions", ie);
218+
}
219+
catch (TimeoutException | ExecutionException ex) {
220+
throw new KafkaException("Failed to obtain topic descriptions", ex);
221+
}
222+
}
223+
}
224+
225+
private AdminClient createAdmin() {
226+
Map<String, Object> configs2 = new HashMap<>(this.configs);
227+
checkBootstrap(configs2);
228+
return AdminClient.create(configs2);
229+
}
230+
231+
private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
201232
if (topics.size() > 0) {
202233
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
203234
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
@@ -298,4 +329,29 @@ private void modifyTopics(AdminClient adminClient, Map<String, NewPartitions> to
298329
}
299330
}
300331

332+
/**
333+
* Wrapper for a collection of {@link NewTopic} to facilitated declaring multiple
334+
* topics as as single bean.
335+
*
336+
* @since 2.7
337+
*
338+
*/
339+
public static class NewTopics {
340+
341+
final Collection<NewTopic> newTopics = new ArrayList<>();
342+
343+
/**
344+
* Construct an instance with the {@link NewTopic}s.
345+
* @param newTopics the topics.
346+
*/
347+
public NewTopics(NewTopic... newTopics) {
348+
this.newTopics.addAll(Arrays.asList(newTopics));
349+
}
350+
351+
Collection<NewTopic> getNewTopics() {
352+
return this.newTopics;
353+
}
354+
355+
}
356+
301357
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.core;
18+
19+
import java.util.Map;
20+
21+
import org.apache.kafka.clients.admin.NewTopic;
22+
import org.apache.kafka.clients.admin.TopicDescription;
23+
24+
/**
25+
* Provides a number of convenience methods wrapping {@code AdminClient}.
26+
*
27+
* @author Gary Russell
28+
* @since 2.7
29+
*
30+
*/
31+
public interface KafkaAdminOperations {
32+
33+
/**
34+
* Get an unmodifiable copy of this admin's configuration.
35+
* @return the configuration map.
36+
*/
37+
Map<String, Object> getConfigurationProperties();
38+
39+
/**
40+
* Create topics if they don't exist or increase the number of partitions if needed.
41+
* @param topics the topics.
42+
*/
43+
void createOrModifyTopics(NewTopic... topics);
44+
45+
/**
46+
* Obtain {@link TopicDescription}s for these topics.
47+
* @param topicNames the topic names.
48+
* @return a map of name:topicDescription.
49+
*/
50+
Map<String, TopicDescription> describeTopics(String... topicNames);
51+
52+
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,8 @@ private Map<String, DestinationTopicHolder> correlatePairSourceAndDestinationVal
164164
private DestinationTopic getNextDestinationTopic(List<DestinationTopic> destinationList, int index) {
165165
return index != destinationList.size() - 1
166166
? destinationList.get(index + 1)
167-
: new DestinationTopic(destinationList.get(index).getDestinationName() + this.NO_OPS_SUFFIX,
168-
destinationList.get(index), this.NO_OPS_SUFFIX, DestinationTopic.Type.NO_OPS);
167+
: new DestinationTopic(destinationList.get(index).getDestinationName() + NO_OPS_SUFFIX,
168+
destinationList.get(index), NO_OPS_SUFFIX, DestinationTopic.Type.NO_OPS);
169169
}
170170

171171
@Override

0 commit comments

Comments
 (0)