Skip to content

Commit aa92c93

Browse files
authored
GH-2720: Add Predicate to KafkaAdmin
Resolves #2720 **cherry-pick to 2.9.x**
1 parent 5f68d75 commit aa92c93

File tree

3 files changed

+67
-4
lines changed

3 files changed

+67
-4
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ private KafkaAdmin admin;
152152
----
153153
====
154154

155+
Starting with versions 2.9.10, 3.0.9, you can provide a `Predicate<NewTopic>` which can be used to determine whether a particular `NewTopic` bean should be considered for creation or modification.
156+
This is useful, for example, if you have multiple `KafkaAdmin` instances pointing to different clusters and you wish to select those topics that should be created or modified by each admin.
157+
158+
====
159+
[source, java]
160+
----
161+
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));
162+
----
163+
====
164+
155165
[[sending-messages]]
156166
==== Sending Messages
157167

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.TimeoutException;
3434
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.function.Predicate;
3536
import java.util.stream.Collectors;
3637

3738
import org.apache.commons.logging.LogFactory;
@@ -62,6 +63,7 @@
6263
import org.springframework.kafka.KafkaException;
6364
import org.springframework.kafka.support.TopicForRetryable;
6465
import org.springframework.lang.Nullable;
66+
import org.springframework.util.Assert;
6567

6668
/**
6769
* An admin that delegates to an {@link AdminClient} to create topics defined
@@ -88,6 +90,8 @@ public class KafkaAdmin extends KafkaResourceFactory
8890

8991
private ApplicationContext applicationContext;
9092

93+
private Predicate<NewTopic> createOrModifyTopic = nt -> true;
94+
9195
private Duration closeTimeout = DEFAULT_CLOSE_TIMEOUT;
9296

9397
private int operationTimeout = DEFAULT_OPERATION_TIMEOUT;
@@ -169,6 +173,31 @@ public void setModifyTopicConfigs(boolean modifyTopicConfigs) {
169173
this.modifyTopicConfigs = modifyTopicConfigs;
170174
}
171175

176+
/**
177+
* Set a predicate that returns true if a discovered {@link NewTopic} bean should be
178+
* considered for creation or modification by this admin instance. The default
179+
* predicate returns true for all {@link NewTopic}s. Used by the default
180+
* implementation of {@link #newTopics()}.
181+
* @param createOrModifyTopic the predicate.
182+
* @since 2.9.10
183+
* @see #newTopics()
184+
*/
185+
public void setCreateOrModifyTopic(Predicate<NewTopic> createOrModifyTopic) {
186+
Assert.notNull(createOrModifyTopic, "'createOrModifyTopic' cannot be null");
187+
this.createOrModifyTopic = createOrModifyTopic;
188+
}
189+
190+
/**
191+
* Return the predicate used to determine whether a {@link NewTopic} should be
192+
* considered for creation or modification.
193+
* @return the predicate.
194+
* @since 2.9.10
195+
* @see #newTopics()
196+
*/
197+
protected Predicate<NewTopic> getCreateOrModifyTopic() {
198+
return this.createOrModifyTopic;
199+
}
200+
172201
@Override
173202
public Map<String, Object> getConfigurationProperties() {
174203
Map<String, Object> configs2 = new HashMap<>(this.configs);
@@ -238,10 +267,17 @@ public final boolean initialize() {
238267
return false;
239268
}
240269

241-
/*
242-
* Remove any TopicForRetryable bean if there is also a NewTopic with the same topic name.
270+
/**
271+
* Return a collection of {@link NewTopic}s to create or modify. The default
272+
* implementation retrieves all {@link NewTopic} beans in the application context and
273+
* applies the {@link #setCreateOrModifyTopic(Predicate)} predicate to each one. It
274+
* also removes any {@link TopicForRetryable} bean if there is also a NewTopic with
275+
* the same topic name. This is performed before calling the predicate.
276+
* @return the collection of {@link NewTopic}s.
277+
* @since 2.9.10
278+
* @see #setCreateOrModifyTopic(Predicate)
243279
*/
244-
private Collection<NewTopic> newTopics() {
280+
protected Collection<NewTopic> newTopics() {
245281
Map<String, NewTopic> newTopicsMap = new HashMap<>(
246282
this.applicationContext.getBeansOfType(NewTopic.class, false, false));
247283
Map<String, NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false);
@@ -269,6 +305,13 @@ private Collection<NewTopic> newTopics() {
269305
newTopicsMap.remove(entry.getKey());
270306
}
271307
}
308+
Iterator<Entry<String, NewTopic>> iterator = newTopicsMap.entrySet().iterator();
309+
while (iterator.hasNext()) {
310+
Entry<String, NewTopic> next = iterator.next();
311+
if (!this.createOrModifyTopic.test(next.getValue())) {
312+
iterator.remove();
313+
}
314+
}
272315
return new ArrayList<>(newTopicsMap.values());
273316
}
274317

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2022 the original author or authors.
2+
* Copyright 2017-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -85,6 +85,9 @@ public class KafkaAdminTests {
8585
@Autowired
8686
private NewTopic mismatchconfig;
8787

88+
@Autowired
89+
private NewTopic dontCreateThisOne;
90+
8891
@Test
8992
public void testTopicConfigs() {
9093
assertThat(topic1.configs()).containsEntry(
@@ -97,6 +100,7 @@ public void testTopicConfigs() {
97100
.replicas(3)
98101
.build().replicationFactor()).isEqualTo((short) 3);
99102
assertThat(topic3.replicasAssignments()).hasSize(3);
103+
assertThat(admin.newTopics()).doesNotContain(this.dontCreateThisOne);
100104
}
101105

102106
@Test
@@ -269,6 +273,7 @@ public KafkaAdmin admin() {
269273
KafkaAdmin admin = new KafkaAdmin(configs);
270274
admin.setBootstrapServersSupplier(() ->
271275
StringUtils.arrayToCommaDelimitedString(kafkaEmbedded().getBrokerAddresses()));
276+
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreate"));
272277
return admin;
273278
}
274279

@@ -338,6 +343,11 @@ public NewTopics topics456() {
338343
.build());
339344
}
340345

346+
@Bean
347+
NewTopic dontCreateThisOne() {
348+
return TopicBuilder.name("dontCreate").build();
349+
}
350+
341351
}
342352

343353
}

0 commit comments

Comments
 (0)