Skip to content

Commit c4787fc

Browse files
thst71artembilan
authored andcommitted
GH-1983: Support copy of producer factory
Fixes #1983 Preserves post processors for of custom factories Instrumentations attached to ProducerFactory instances are destroyed if the user adds configOverrides. This behavior is unexpected for the developer, for example it will undo sleuth instrumentation that is otherwise kept if the new KafkaTemplate(Map) constructor would be used. * adds copy for all visible factory properties and tests * moves the copy feature to ProducerFactory * fixing review remarks * removes a left over TODO * implements a generic copy in the KafkaTemplate to avoid breaking changes * wraps javadoc at col 90 * Clean up for code style **Cherry-pick to `2.5.x`, `2.6.x` & `2.7.x`** # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java # spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java # spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java
1 parent 4a75140 commit c4787fc

File tree

4 files changed

+250
-29
lines changed

4 files changed

+250
-29
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,11 @@
108108
* @author Nakul Mishra
109109
* @author Artem Bilan
110110
* @author Chris Gilbert
111+
* @author Thomas Strauß
111112
*/
112113
public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
113114
implements ProducerFactory<K, V>, ApplicationContextAware,
114-
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
115+
BeanNameAware, ApplicationListener<ContextStoppedEvent>, DisposableBean {
115116

116117
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DefaultKafkaProducerFactory.class));
117118

@@ -360,6 +361,63 @@ public void setMaxAge(Duration maxAge) {
360361
this.maxAge = maxAge.toMillis();
361362
}
362363

364+
/**
365+
* Copy properties of the instance and the given properties to create a new producer factory.
366+
* <p>If the {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} makes a
367+
* copy of itself, the transaction id prefix is recovered from the properties. If
368+
* you want to change the ID config, add a new
369+
* {@link org.apache.kafka.clients.producer.ProducerConfig#TRANSACTIONAL_ID_CONFIG}
370+
* key to the override config.</p>
371+
* @param overrideProperties the properties to be applied to the new factory
372+
* @return {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} with
373+
* properties applied
374+
*/
375+
@Override
376+
public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> overrideProperties) {
377+
Map<String, Object> producerProperties = new HashMap<>(getConfigurationProperties());
378+
producerProperties.putAll(overrideProperties);
379+
producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties);
380+
DefaultKafkaProducerFactory<K, V> newFactory =
381+
new DefaultKafkaProducerFactory<>(producerProperties,
382+
getKeySerializerSupplier(),
383+
getValueSerializerSupplier());
384+
newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds());
385+
newFactory.setProducerPerConsumerPartition(isProducerPerConsumerPartition());
386+
newFactory.setProducerPerThread(isProducerPerThread());
387+
for (ProducerPostProcessor<K, V> templatePostProcessor : getPostProcessors()) {
388+
newFactory.addPostProcessor(templatePostProcessor);
389+
}
390+
for (ProducerFactory.Listener<K, V> templateListener : getListeners()) {
391+
newFactory.addListener(templateListener);
392+
}
393+
return newFactory;
394+
}
395+
396+
397+
/**
398+
* Ensures that the returned properties map contains a transaction id prefix.
399+
* The {@link org.springframework.kafka.core.DefaultKafkaProducerFactory}
400+
* modifies the local properties copy, the txn key is removed and
401+
* stored locally in a property. To make a proper copy of the properties in a
402+
* new factory, the transactionId has to be reinserted prior use.
403+
* The incoming properties are checked for a transactionId key. If none is
404+
* there, the one existing in the factory is added.
405+
* @param producerProperties the properties to be used for the new factory
406+
* @return the producerProperties or a copy with the transaction ID set
407+
*/
408+
private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<String, Object> producerProperties) {
409+
String transactionIdPrefix = getTransactionIdPrefix();
410+
if (StringUtils.hasText(transactionIdPrefix)) {
411+
if (!producerProperties.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
412+
Map<String, Object> producerPropertiesWithTxnId = new HashMap<>(producerProperties);
413+
producerPropertiesWithTxnId.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
414+
return producerPropertiesWithTxnId;
415+
}
416+
}
417+
418+
return producerProperties;
419+
}
420+
363421
/**
364422
* Add a listener.
365423
* @param listener the listener.
@@ -417,8 +475,8 @@ public void updateConfigs(Map<String, Object> updates) {
417475
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
418476
+ "' must be a String, not a " + entry.getClass().getName());
419477
Assert.isTrue(this.transactionIdPrefix != null
420-
? entry.getValue() != null
421-
: entry.getValue() == null,
478+
? entry.getValue() != null
479+
: entry.getValue() == null,
422480
"Cannot change transactional capability");
423481
this.transactionIdPrefix = (String) entry.getValue();
424482
}
@@ -694,7 +752,7 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
694752
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
695753
if (producerToRemove.epoch != this.epoch.get()
696754
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
697-
&& !txIdCache.offer(producerToRemove))) {
755+
&& !txIdCache.offer(producerToRemove))) {
698756
producerToRemove.closeDelegate(timeout, this.listeners);
699757
return true;
700758
}
@@ -942,7 +1000,7 @@ public void abortTransaction() throws ProducerFencedException {
9421000
LOGGER.debug(() -> toString() + " abortTransaction()");
9431001
if (this.producerFailed != null) {
9441002
LOGGER.debug(() -> "abortTransaction ignored - previous txFailed: " + this.producerFailed.getMessage()
945-
+ ": " + this);
1003+
+ ": " + this);
9461004
}
9471005
else {
9481006
try {

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,8 @@
7575
* @author Igor Stepanov
7676
* @author Artem Bilan
7777
* @author Biju Kunjummen
78-
* @author Endika Guti?rrez
78+
* @author Endika Gutierrez
79+
* @author Thomas Strauß
7980
*/
8081
public class KafkaTemplate<K, V> implements KafkaOperations<K, V>, ApplicationContextAware, BeanNameAware,
8182
ApplicationListener<ContextStoppedEvent>, DisposableBean {
@@ -158,8 +159,16 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush) {
158159
* to occur immediately, regardless of that setting, or if you wish to block until the
159160
* broker has acknowledged receipt according to the producer's {@code acks} property.
160161
* If the configOverrides is not null or empty, a new
161-
* {@link DefaultKafkaProducerFactory} will be created with merged producer properties
162-
* with the overrides being applied after the supplied factory's properties.
162+
* {@link ProducerFactory} will be created using
163+
* {@link org.springframework.kafka.core.ProducerFactory#copyWithConfigurationOverride(java.util.Map)}
164+
* The factory shall apply the overrides after the supplied factory's properties.
165+
* The {@link org.springframework.kafka.core.ProducerPostProcessor}s from the
166+
* original factory are copied over to keep instrumentation alive.
167+
* Registered {@link org.springframework.kafka.core.ProducerFactory.Listener}s are
168+
* also added to the new factory. If the factory implementation does not support
169+
* the copy operation, a generic copy of the ProducerFactory is created which will
170+
* be of type
171+
* DefaultKafkaProducerFactory.
163172
* @param producerFactory the producer factory.
164173
* @param autoFlush true to flush after each send.
165174
* @param configOverrides producer configuration properties to override.
@@ -174,21 +183,57 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
174183
this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
175184
this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;
176185
if (this.customProducerFactory) {
177-
Map<String, Object> configs = new HashMap<>(producerFactory.getConfigurationProperties());
178-
configs.putAll(configOverrides);
179-
DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(configs,
180-
producerFactory.getKeySerializerSupplier(), producerFactory.getValueSerializerSupplier());
181-
newFactory.setPhysicalCloseTimeout((int) producerFactory.getPhysicalCloseTimeout().getSeconds());
182-
newFactory.setProducerPerConsumerPartition(producerFactory.isProducerPerConsumerPartition());
183-
newFactory.setProducerPerThread(producerFactory.isProducerPerThread());
184-
this.producerFactory = newFactory;
186+
this.producerFactory = copyProducerFactoryWithOverrides(producerFactory, configOverrides);
185187
}
186188
else {
187189
this.producerFactory = producerFactory;
188190
}
189191
this.transactional = this.producerFactory.transactionCapable();
190192
}
191193

194+
private ProducerFactory<K, V> copyProducerFactoryWithOverrides(ProducerFactory<K, V> templateFactory,
195+
Map<String, Object> configOverrides) {
196+
197+
ProducerFactory<K, V> newFactory;
198+
try {
199+
newFactory = templateFactory.copyWithConfigurationOverride(configOverrides);
200+
}
201+
catch (UnsupportedOperationException e) {
202+
newFactory = handleNonCopyableProducerFactory(templateFactory, configOverrides);
203+
}
204+
205+
return newFactory;
206+
}
207+
208+
/**
209+
* This method copies a ProducerFactory that misses the implementation of
210+
* {@link org.springframework.kafka.core.ProducerFactory#copyWithConfigurationOverride(java.util.Map)}.
211+
*
212+
* @param templateFactory the ProducerFactory to copy from
213+
* @param configOverrides new properties to be applied onto the templateFactory properties
214+
* @return a DefaultKafkaProducerFactory configured with configOverrides and all
215+
* public reachable settings of ProducerFactory
216+
*/
217+
private DefaultKafkaProducerFactory<K, V> handleNonCopyableProducerFactory(ProducerFactory<K, V> templateFactory,
218+
Map<String, Object> configOverrides) {
219+
220+
Map<String, Object> producerProperties = new HashMap<>(templateFactory.getConfigurationProperties());
221+
producerProperties.putAll(configOverrides);
222+
DefaultKafkaProducerFactory<K, V> defaultFactory = new DefaultKafkaProducerFactory<>(producerProperties,
223+
templateFactory.getKeySerializerSupplier(),
224+
templateFactory.getValueSerializerSupplier());
225+
defaultFactory.setPhysicalCloseTimeout((int) templateFactory.getPhysicalCloseTimeout().getSeconds());
226+
defaultFactory.setProducerPerConsumerPartition(templateFactory.isProducerPerConsumerPartition());
227+
defaultFactory.setProducerPerThread(templateFactory.isProducerPerThread());
228+
for (ProducerPostProcessor<K, V> templatePostProcessor : templateFactory.getPostProcessors()) {
229+
defaultFactory.addPostProcessor(templatePostProcessor);
230+
}
231+
for (ProducerFactory.Listener<K, V> templateListener : templateFactory.getListeners()) {
232+
defaultFactory.addListener(templateListener);
233+
}
234+
return defaultFactory;
235+
}
236+
192237
@Override
193238
public void setBeanName(String name) {
194239
this.beanName = name;

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactory.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -34,6 +34,7 @@
3434
* @param <V> the value type.
3535
*
3636
* @author Gary Russell
37+
* @author Thomas Strauß
3738
*/
3839
public interface ProducerFactory<K, V> {
3940

@@ -141,7 +142,7 @@ default Supplier<Serializer<K>> getKeySerializerSupplier() {
141142

142143
/**
143144
* Return true when there is a producer per thread.
144-
* @return the produver per thread.
145+
* @return the producer per thread.
145146
* @since 2.5
146147
*/
147148
default boolean isProducerPerThread() {
@@ -250,6 +251,26 @@ default void updateConfigs(Map<String, Object> updates) {
250251
default void removeConfig(String configKey) {
251252
}
252253

254+
/**
255+
* Copy the properties of the instance and the given properties to create a new producer factory.
256+
* <p>The copy shall prioritize the override properties over the configured values.
257+
* It is in the responsibility of the factory implementation to make sure the
258+
* configuration of the new factory is identical, complete and correct.</p>
259+
* <p>ProducerPostProcessor and Listeners must stay intact.</p>
260+
* <p>If the factory does not implement this method, an exception will be thrown.</p>
261+
* <p>Note: see
262+
* {@link org.springframework.kafka.core.DefaultKafkaProducerFactory#copyWithConfigurationOverride}</p>
263+
* @param overrideProperties the properties to be applied to the new factory
264+
* @return {@link org.springframework.kafka.core.ProducerFactory} with properties
265+
* applied
266+
* @since 2.5.17
267+
* @see org.springframework.kafka.core.KafkaTemplate#KafkaTemplate(ProducerFactory, java.util.Map)
268+
*/
269+
default ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> overrideProperties) {
270+
throw new UnsupportedOperationException(
271+
"This factory implementation doesn't support creating reconfigured copies.");
272+
}
273+
253274
/**
254275
* Called whenever a producer is added or removed.
255276
*
@@ -271,9 +292,8 @@ default void producerAdded(String id, Producer<K, V> producer) {
271292
}
272293

273294
/**
274-
* An exsting producer was removed.
275-
* @param id the producer id (factory bean name and client.id separated by a
276-
* period).
295+
* An existing producer was removed.
296+
* @param id the producer id (factory bean name and client.id separated by a period).
277297
* @param producer the producer.
278298
*/
279299
default void producerRemoved(String id, Producer<K, V> producer) {

0 commit comments

Comments
 (0)