Skip to content

Commit 813e104

Browse files
garyrussellartembilan
authored andcommitted
GH-1776: Fix transactional Template with Overrides
Resolves #1776 Set `transactional` flag using overrides. **cherry-pick to 2.6.x, 2.5.x**
1 parent ffa6f78 commit 813e104

File tree

2 files changed

+3
-1
lines changed

2 files changed

+3
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,6 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
171171

172172
Assert.notNull(producerFactory, "'producerFactory' cannot be null");
173173
this.autoFlush = autoFlush;
174-
this.transactional = producerFactory.transactionCapable();
175174
this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
176175
this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;
177176
if (this.customProducerFactory) {
@@ -187,6 +186,7 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
187186
else {
188187
this.producerFactory = producerFactory;
189188
}
189+
this.transactional = this.producerFactory.transactionCapable();
190190
}
191191

192192
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,12 +447,14 @@ void testConfigOverrides() {
447447
pf.setProducerPerThread(true);
448448
Map<String, Object> overrides = new HashMap<>();
449449
overrides.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
450+
overrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TX");
450451
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true, overrides);
451452
assertThat(template.getProducerFactory().getConfigurationProperties()
452453
.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(StringSerializer.class);
453454
assertThat(template.getProducerFactory().getPhysicalCloseTimeout()).isEqualTo(Duration.ofSeconds(6));
454455
assertThat(template.getProducerFactory().isProducerPerConsumerPartition()).isFalse();
455456
assertThat(template.getProducerFactory().isProducerPerThread()).isTrue();
457+
assertThat(template.isTransactional()).isTrue();
456458
}
457459

458460
@Test

0 commit comments

Comments
 (0)