Skip to content

Commit 00bf6ae

Browse files
committed
GH-1983: No ProducerFactory copy in KafkaTemplate
Fixes #1983 It is a misdirection to use a `DefaultProducerFactory` in the `KafkaTemplate` to create a copy of the provided `ProducerFactory`. Now it is a target project responsibility to implement `copyWithConfigurationOverride()` properly
1 parent d4b88b0 commit 00bf6ae

File tree

2 files changed

+2
-104
lines changed

2 files changed

+2
-104
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -197,57 +197,14 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
197197
this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
198198
this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;
199199
if (this.customProducerFactory) {
200-
this.producerFactory = copyProducerFactoryWithOverrides(producerFactory, configOverrides);
200+
this.producerFactory = producerFactory.copyWithConfigurationOverride(configOverrides);
201201
}
202202
else {
203203
this.producerFactory = producerFactory;
204204
}
205205
this.transactional = this.producerFactory.transactionCapable();
206206
}
207207

208-
private ProducerFactory<K, V> copyProducerFactoryWithOverrides(ProducerFactory<K, V> templateFactory,
209-
Map<String, Object> configOverrides) {
210-
211-
ProducerFactory<K, V> newFactory;
212-
try {
213-
newFactory = templateFactory.copyWithConfigurationOverride(configOverrides);
214-
}
215-
catch (UnsupportedOperationException e) {
216-
newFactory = handleNonCopyableProducerFactory(templateFactory, configOverrides);
217-
}
218-
219-
return newFactory;
220-
}
221-
222-
/**
223-
* This method copies a ProducerFactory that misses the implementation of
224-
* {@link org.springframework.kafka.core.ProducerFactory#copyWithConfigurationOverride(java.util.Map)}.
225-
*
226-
* @param templateFactory the ProducerFactory to copy from
227-
* @param configOverrides new properties to be applied onto the templateFactory properties
228-
* @return a DefaultKafkaProducerFactory configured with configOverrides and all
229-
* public reachable settings of ProducerFactory
230-
*/
231-
private DefaultKafkaProducerFactory<K, V> handleNonCopyableProducerFactory(ProducerFactory<K, V> templateFactory,
232-
Map<String, Object> configOverrides) {
233-
234-
Map<String, Object> producerProperties = new HashMap<>(templateFactory.getConfigurationProperties());
235-
producerProperties.putAll(configOverrides);
236-
DefaultKafkaProducerFactory<K, V> defaultFactory = new DefaultKafkaProducerFactory<>(producerProperties,
237-
templateFactory.getKeySerializerSupplier(),
238-
templateFactory.getValueSerializerSupplier());
239-
defaultFactory.setPhysicalCloseTimeout((int) templateFactory.getPhysicalCloseTimeout().getSeconds());
240-
defaultFactory.setProducerPerConsumerPartition(templateFactory.isProducerPerConsumerPartition());
241-
defaultFactory.setProducerPerThread(templateFactory.isProducerPerThread());
242-
for (ProducerPostProcessor<K, V> templatePostProcessor : templateFactory.getPostProcessors()) {
243-
defaultFactory.addPostProcessor(templatePostProcessor);
244-
}
245-
for (ProducerFactory.Listener<K, V> templateListener : templateFactory.getListeners()) {
246-
defaultFactory.addListener(templateListener);
247-
}
248-
return defaultFactory;
249-
}
250-
251208
@Override
252209
public void setBeanName(String name) {
253210
this.beanName = name;

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 1 addition & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ void testWithCallbackFailureFunctional() throws Exception {
454454
final CountDownLatch latch = new CountDownLatch(1);
455455
final AtomicReference<SendResult<Integer, String>> theResult = new AtomicReference<>();
456456
AtomicReference<String> value = new AtomicReference<>();
457-
future.addCallback(result -> { }, (KafkaFailureCallback<Integer, String>) ex -> {
457+
future.addCallback(result -> {}, (KafkaFailureCallback<Integer, String>) ex -> {
458458
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
459459
value.set(failed.value());
460460
latch.countDown();
@@ -525,65 +525,6 @@ void testConfigOverridesWithDefaultKafkaProducerFactory() {
525525
assertThat(templateWTX2_2.getProducerFactory().getTransactionIdPrefix()).isEqualTo("TX2");
526526
}
527527

528-
@Test
529-
void testConfigOverridesWithCustomProducerFactory() {
530-
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
531-
ProducerFactory<String, String> pf = new ProducerFactory<>() {
532-
533-
@Override
534-
public Producer<String, String> createProducer() {
535-
return null;
536-
}
537-
538-
@Override
539-
public List<Listener<String, String>> getListeners() {
540-
return Collections.singletonList(noopListener);
541-
}
542-
543-
@Override
544-
public List<ProducerPostProcessor<String, String>> getPostProcessors() {
545-
return Collections.singletonList(noopProducerPostProcessor);
546-
}
547-
548-
@Override
549-
public Map<String, Object> getConfigurationProperties() {
550-
return Collections.singletonMap(ProducerConfig.ACKS_CONFIG, "all");
551-
}
552-
553-
@Override
554-
public Duration getPhysicalCloseTimeout() {
555-
return Duration.ofSeconds(6);
556-
}
557-
558-
@Override
559-
public boolean isProducerPerConsumerPartition() {
560-
return true;
561-
}
562-
563-
@Override
564-
public boolean isProducerPerThread() {
565-
return false;
566-
}
567-
};
568-
569-
Map<String, Object> overrides = new HashMap<>();
570-
overrides.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
571-
overrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TX");
572-
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true, overrides);
573-
assertThat(template.getProducerFactory()).isOfAnyClassIn(DefaultKafkaProducerFactory.class);
574-
assertThat(template.getProducerFactory().getConfigurationProperties()
575-
.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(StringSerializer.class);
576-
assertThat(template.getProducerFactory().getPhysicalCloseTimeout()).isEqualTo(Duration.ofSeconds(6));
577-
assertThat(template.getProducerFactory().isProducerPerConsumerPartition()).isTrue();
578-
assertThat(template.getProducerFactory().isProducerPerThread()).isFalse();
579-
assertThat(template.isTransactional()).isTrue();
580-
assertThat(template.getProducerFactory().getListeners()).isEqualTo(pf.getListeners());
581-
assertThat(template.getProducerFactory().getListeners().size()).isEqualTo(1);
582-
assertThat(template.getProducerFactory().getPostProcessors()).isEqualTo(pf.getPostProcessors());
583-
assertThat(template.getProducerFactory().getPostProcessors().size()).isEqualTo(1);
584-
assertThat(template.getProducerFactory().getTransactionIdPrefix()).isEqualTo("TX");
585-
}
586-
587528
@Test
588529
void testConfigOverridesWithSerializers() {
589530
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

0 commit comments

Comments
 (0)