Skip to content

Commit 0b61b5d

Browse files
garyrussellartembilan
authored andcommitted
GH-1736: Depr. StreamsBuilderFactoryBeanCustomizer
Resolves #1736 **cherry-pick to 2.6.x**
1 parent 73d853b commit 0b61b5d

File tree

7 files changed

+351
-10
lines changed

7 files changed

+351
-10
lines changed

spring-kafka-docs/src/main/asciidoc/streams.adoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,8 +229,8 @@ To avoid boilerplate code for most cases, especially when you develop microservi
229229
All you need is to declare a `KafkaStreamsConfiguration` bean named `defaultKafkaStreamsConfig`.
230230
A `StreamsBuilderFactoryBean` bean, named `defaultKafkaStreamsBuilder`, is automatically declared in the application context.
231231
You can declare and use any additional `StreamsBuilderFactoryBean` beans as well.
232-
Starting with version 2.3, you can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanCustomizer`.
233-
There must only be one such bean, or one must be marked `@Primary`.
232+
You can perform additional customization of that bean, by providing a bean that implements `StreamsBuilderFactoryBeanConfigurer`.
233+
If there are multiple such beans, they will be applied according to their `Ordered.order` property.
234234

235235
By default, when the factory bean is stopped, the `KafkaStreams.cleanUp()` method is called.
236236
Starting with version 2.1.2, the factory bean has additional constructors, taking a `CleanupConfig` object that has properties to let you control whether the `cleanUp()` method is called during `start()` or `stop()` or neither.
@@ -339,7 +339,7 @@ public static class KafkaStreamsConfig {
339339
}
340340
341341
@Bean
342-
public StreamsBuilderFactoryBeanCustomizer customizer() {
342+
public StreamsBuilderFactoryBeanConfigurer configurer() {
343343
return fb -> fb.setStateListener((newState, oldState) -> {
344344
System.out.println("State transition from " + oldState + " to " + newState);
345345
});

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2019 the original author or authors.
2+
* Copyright 2017-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,14 +16,17 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19+
import java.util.HashSet;
20+
import java.util.Set;
21+
1922
import org.springframework.beans.factory.ObjectProvider;
2023
import org.springframework.beans.factory.UnsatisfiedDependencyException;
2124
import org.springframework.beans.factory.annotation.Qualifier;
2225
import org.springframework.context.annotation.Bean;
2326
import org.springframework.context.annotation.Configuration;
2427
import org.springframework.kafka.config.KafkaStreamsConfiguration;
2528
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
26-
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
29+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
2730

2831
/**
2932
* {@code @Configuration} class that registers a {@link StreamsBuilderFactoryBean}
@@ -53,17 +56,33 @@ public class KafkaStreamsDefaultConfiguration {
5356
*/
5457
public static final String DEFAULT_STREAMS_BUILDER_BEAN_NAME = "defaultKafkaStreamsBuilder";
5558

59+
/**
60+
* Bean for the default {@link StreamsBuilderFactoryBean}.
61+
* @param streamsConfigProvider the streams config.
62+
* @param customizerProvider the customizer.
63+
* @param configurerProvider the configurer.
64+
*
65+
* @return the factory bean.
66+
*/
67+
@SuppressWarnings("deprecation")
5668
@Bean(name = DEFAULT_STREAMS_BUILDER_BEAN_NAME)
5769
public StreamsBuilderFactoryBean defaultKafkaStreamsBuilder(
5870
@Qualifier(DEFAULT_STREAMS_CONFIG_BEAN_NAME)
5971
ObjectProvider<KafkaStreamsConfiguration> streamsConfigProvider,
60-
ObjectProvider<StreamsBuilderFactoryBeanCustomizer> customizerProvider) {
72+
ObjectProvider<org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer> customizerProvider,
73+
ObjectProvider<StreamsBuilderFactoryBeanConfigurer> configurerProvider) {
6174

6275
KafkaStreamsConfiguration streamsConfig = streamsConfigProvider.getIfAvailable();
6376
if (streamsConfig != null) {
6477
StreamsBuilderFactoryBean fb = new StreamsBuilderFactoryBean(streamsConfig);
65-
StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider.getIfUnique();
66-
if (customizer != null) {
78+
Set<StreamsBuilderFactoryBeanConfigurer> configuredBy = new HashSet<>();
79+
configurerProvider.orderedStream().forEach(configurer -> {
80+
configurer.configure(fb);
81+
configuredBy.add(configurer);
82+
});
83+
org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer customizer = customizerProvider
84+
.getIfUnique();
85+
if (customizer != null && !configuredBy.contains(customizer)) {
6786
customizer.configure(fb);
6887
}
6988
return fb;
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.config;
18+
19+
import org.springframework.core.Ordered;
20+
21+
/**
22+
* A configurer for {@link StreamsBuilderFactoryBean}. Applied, in order, to the single
23+
* {@link StreamsBuilderFactoryBean} configured by the framework. Invoked after the bean
24+
* is created and before it is started. Default order is 0.
25+
*
26+
* @author Gary Russell
27+
* @since 2.6.7
28+
*
29+
*/
30+
@FunctionalInterface
31+
@SuppressWarnings("deprecation")
32+
public interface StreamsBuilderFactoryBeanConfigurer extends StreamsBuilderFactoryBeanCustomizer, Ordered {
33+
34+
/**
35+
* Overridden to avoid deprecation warnings.
36+
*/
37+
@Override
38+
void configure(StreamsBuilderFactoryBean factoryBean);
39+
40+
@Override
41+
default int getOrder() {
42+
return 0;
43+
}
44+
45+
}

spring-kafka/src/main/java/org/springframework/kafka/config/StreamsBuilderFactoryBeanCustomizer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,11 +22,14 @@
2222
* implementation of this interface is found in the application context (or one is marked
2323
* as {@link org.springframework.context.annotation.Primary}, it will be invoked after the
2424
* factory bean has been created and before it is started.
25+
* @deprecated in favor of {@code StreamsBuilderFactoryBeanConfigurer} due to a name
26+
* clash with a similar class in Spring Boot.
2527
*
2628
* @author Gary Russell
2729
* @since 2.3
2830
*
2931
*/
32+
@Deprecated
3033
@FunctionalInterface
3134
public interface StreamsBuilderFactoryBeanCustomizer {
3235

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.streams;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.ArrayList;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
26+
import org.apache.kafka.common.serialization.Serdes;
27+
import org.apache.kafka.streams.StreamsBuilder;
28+
import org.apache.kafka.streams.StreamsConfig;
29+
import org.apache.kafka.streams.kstream.KStream;
30+
import org.apache.kafka.streams.processor.WallclockTimestampExtractor;
31+
import org.junit.jupiter.api.Test;
32+
33+
import org.springframework.beans.factory.annotation.Autowired;
34+
import org.springframework.beans.factory.annotation.Value;
35+
import org.springframework.context.annotation.Bean;
36+
import org.springframework.context.annotation.Configuration;
37+
import org.springframework.kafka.annotation.EnableKafkaStreams;
38+
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
39+
import org.springframework.kafka.config.KafkaStreamsConfiguration;
40+
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
41+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
42+
import org.springframework.kafka.config.StreamsBuilderFactoryBeanCustomizer;
43+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
44+
import org.springframework.kafka.test.context.EmbeddedKafka;
45+
import org.springframework.test.annotation.DirtiesContext;
46+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
47+
48+
/**
49+
* @author Gary Russell
50+
* @since 2.7
51+
*
52+
*/
53+
@SpringJUnitConfig
54+
@DirtiesContext
55+
@EmbeddedKafka(partitions = 1,
56+
topics = Configurer1Tests.STREAMING_TOPIC1,
57+
brokerProperties = {
58+
"auto.create.topics.enable=${topics.autoCreate:false}",
59+
"delete.topic.enable=${topic.delete:true}" },
60+
brokerPropertiesLocation = "classpath:/${broker.filename:broker}.properties")
61+
public class Configurer1Tests {
62+
63+
public static final String STREAMING_TOPIC1 = "Configurer1Tests1";
64+
65+
@Test
66+
void appliedInOrder(@Autowired List<Integer> callOrder) {
67+
assertThat(callOrder).containsExactly(1, 2, 3);
68+
}
69+
70+
@Configuration
71+
@EnableKafkaStreams
72+
public static class Config {
73+
74+
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
75+
private String brokerAddresses;
76+
77+
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
78+
public KafkaStreamsConfiguration kStreamsConfigs() {
79+
Map<String, Object> props = new HashMap<>();
80+
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "configurer1");
81+
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
82+
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
83+
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
84+
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
85+
WallclockTimestampExtractor.class.getName());
86+
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "100");
87+
return new KafkaStreamsConfiguration(props);
88+
}
89+
90+
@Bean
91+
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
92+
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
93+
stream.foreach((K, v) -> { });
94+
return stream;
95+
}
96+
97+
@Bean
98+
List<Integer> callOrder() {
99+
return new ArrayList<>();
100+
}
101+
102+
@Bean
103+
StreamsBuilderFactoryBeanConfigurer three(List<Integer> callOrder) {
104+
return new StreamsBuilderFactoryBeanConfigurer() {
105+
106+
@Override
107+
public void configure(StreamsBuilderFactoryBean factoryBean) {
108+
callOrder.add(3);
109+
}
110+
111+
@Override
112+
public int getOrder() {
113+
return Integer.MAX_VALUE;
114+
}
115+
116+
};
117+
}
118+
119+
@Bean
120+
StreamsBuilderFactoryBeanConfigurer one(List<Integer> callOrder) {
121+
return new StreamsBuilderFactoryBeanConfigurer() {
122+
123+
@Override
124+
public void configure(StreamsBuilderFactoryBean factoryBean) {
125+
callOrder.add(1);
126+
}
127+
128+
@Override
129+
public int getOrder() {
130+
return Integer.MIN_VALUE;
131+
}
132+
133+
};
134+
}
135+
136+
@Bean
137+
StreamsBuilderFactoryBeanConfigurer two(List<Integer> callOrder) {
138+
return new StreamsBuilderFactoryBeanConfigurer() {
139+
140+
@Override
141+
public void configure(StreamsBuilderFactoryBean factoryBean) {
142+
callOrder.add(2);
143+
}
144+
145+
};
146+
}
147+
148+
@SuppressWarnings("deprecation")
149+
@Bean
150+
public StreamsBuilderFactoryBeanCustomizer wontBeFoundNotUnique(List<Integer> callOrder) {
151+
return fb -> fb.setStateListener((newState, oldState) -> {
152+
callOrder.add(4);
153+
});
154+
}
155+
156+
}
157+
158+
}

0 commit comments

Comments
 (0)