Skip to content

Commit 1140d5e

Browse files
garyrussellartembilan
authored andcommitted
GH-1641: Reconfigurable Producer Factory
Resolves #1641 **I will back-port; the test will need some tweaks**
1 parent 1ff7823 commit 1140d5e

File tree

5 files changed

+99
-7
lines changed

5 files changed

+99
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -140,8 +140,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
140140

141141
private Duration physicalCloseTimeout = DEFAULT_PHYSICAL_CLOSE_TIMEOUT;
142142

143-
private String transactionIdPrefix;
144-
145143
private ApplicationContext applicationContext;
146144

147145
private String beanName = "not.managed.by.Spring";
@@ -150,10 +148,12 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
150148

151149
private boolean producerPerThread;
152150

153-
private String clientIdPrefix;
154-
155151
private long maxAge;
156152

153+
private volatile String transactionIdPrefix;
154+
155+
private volatile String clientIdPrefix;
156+
157157
private volatile CloseSafeProducer<K, V> producer;
158158

159159
/**
@@ -412,6 +412,34 @@ public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {
412412
return this.postProcessors.remove(postProcessor);
413413
}
414414

415+
@Override
416+
public void updateConfigs(Map<String, Object> updates) {
417+
updates.entrySet().forEach(entry -> {
418+
if (entry.getKey().equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
419+
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
420+
+ "' must be a String, not a " + entry.getClass().getName());
421+
Assert.isTrue(this.transactionIdPrefix != null
422+
? entry.getValue() != null
423+
: entry.getValue() == null,
424+
"Cannot change transactional capability");
425+
this.transactionIdPrefix = (String) entry.getValue();
426+
}
427+
else if (entry.getKey().equals(ProducerConfig.CLIENT_ID_CONFIG)) {
428+
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG
429+
+ "' must be a String, not a " + entry.getClass().getName());
430+
this.clientIdPrefix = (String) entry.getValue();
431+
}
432+
else {
433+
this.configs.put(entry.getKey(), entry.getValue());
434+
}
435+
});
436+
}
437+
438+
@Override
439+
public void removeConfig(String configKey) {
440+
this.configs.remove(configKey);
441+
}
442+
415443
/**
416444
* When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream.
417445
*/

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ default boolean transactionCapable() {
8383
* @since 1.3.8
8484
*/
8585
default void closeProducerFor(String transactionIdSuffix) {
86-
// NOSONAR
8786
}
8887

8988
/**
@@ -101,15 +100,13 @@ default boolean isProducerPerConsumerPartition() {
101100
* @since 2.3
102101
*/
103102
default void closeThreadBoundProducer() {
104-
// NOSONAR
105103
}
106104

107105
/**
108106
* Reset any state in the factory, if supported.
109107
* @since 2.4
110108
*/
111109
default void reset() {
112-
// NOSONAR
113110
}
114111

115112
/**
@@ -236,6 +233,23 @@ default List<ProducerPostProcessor<K, V>> getPostProcessors() {
236233
return Collections.emptyList();
237234
}
238235

236+
/**
237+
* Update the producer configuration map; useful for situations such as
238+
* credential rotation.
239+
* @param updates the configuration properties to update.
240+
* @since 2.5.10
241+
*/
242+
default void updateConfigs(Map<String, Object> updates) {
243+
}
244+
245+
/**
246+
* Remove the specified key from the configuration map.
247+
* @param configKey the key to remove.
248+
* @since 2.5.10
249+
*/
250+
default void removeConfig(String configKey) {
251+
}
252+
239253
/**
240254
* Called whenever a producer is added or removed.
241255
*

spring-kafka/src/test/java/org/springframework/kafka/core/DefaultKafkaProducerFactoryTests.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2122
import static org.mockito.ArgumentMatchers.any;
2223
import static org.mockito.ArgumentMatchers.isNull;
2324
import static org.mockito.BDDMockito.given;
@@ -413,4 +414,33 @@ protected Producer createRawProducer(Map configs) {
413414
pf.destroy();
414415
}
415416

417+
@SuppressWarnings({ "rawtypes", "unchecked" })
418+
@Test
419+
void configUpdates() {
420+
Map<String, Object> configs = new HashMap<>();
421+
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
422+
DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(configs);
423+
assertThat(pf.getConfigurationProperties()).hasSize(2);
424+
configs.remove(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
425+
configs.put(ProducerConfig.ACKS_CONFIG, "all");
426+
pf.updateConfigs(configs);
427+
assertThat(pf.getConfigurationProperties()).hasSize(3);
428+
pf.removeConfig(ProducerConfig.ACKS_CONFIG);
429+
assertThat(pf.getConfigurationProperties()).hasSize(2);
430+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-");
431+
assertThatIllegalArgumentException().isThrownBy(() -> pf.updateConfigs(configs));
432+
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
433+
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId");
434+
DefaultKafkaProducerFactory pf1 = new DefaultKafkaProducerFactory<>(configs);
435+
assertThat(pf1.getConfigurationProperties()).hasSize(5);
436+
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "clientId2");
437+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx2-");
438+
pf1.updateConfigs(configs);
439+
assertThat(pf1.getConfigurationProperties()).hasSize(5);
440+
assertThat(KafkaTestUtils.getPropertyValue(pf1, "clientIdPrefix")).isEqualTo("clientId2");
441+
assertThat(KafkaTestUtils.getPropertyValue(pf1, "transactionIdPrefix")).isEqualTo("tx2-");
442+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
443+
assertThatIllegalArgumentException().isThrownBy(() -> pf1.updateConfigs(configs));
444+
}
445+
416446
}

src/reference/asciidoc/kafka.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,22 @@ public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
531531
----
532532
====
533533

534+
Starting with version 2.5.10, you can now update the producer properties after the factory is created.
535+
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
536+
The changes will not affect existing producer instances; call `reset()` to close any existing producers so that new producers will be created using the new properties.
537+
NOTE: You cannot change a transactional producer factory to non-transactional, and vice-versa.
538+
539+
Two new methods are now provided:
540+
541+
====
542+
[source, java]
543+
----
544+
void updateConfigs(Map<String, Object> updates);
545+
546+
void removeConfig(String configKey);
547+
----
548+
====
549+
534550
[[replying-template]]
535551
===== Using `ReplyingKafkaTemplate`
536552

src/reference/asciidoc/whats-new.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,7 @@ See <<seek-to-current>>, <<after-rollback>>, <<recovering-batch-eh>> for more in
4242

4343
You can now set a maximum age for producers after which they will be closed and recreated.
4444
See <<transactions>> for more information.
45+
46+
You can now update the configuration map after the `DefaultKafkaProducerFactory` has been created.
47+
This might be useful, for example, if you have to update SSL key/trust store locations after a credentials change.
48+
See <<producer-factory>> for more information.

0 commit comments

Comments
 (0)