Skip to content

Commit 06483a0

Browse files
garyrussellartembilan
authored andcommitted
GH-1776: Fix transactional Template with Overrides
Resolves #1776 Set `transactional` flag using overrides. **cherry-pick to 2.6.x, 2.5.x**
1 parent dc9a58e commit 06483a0

File tree

2 files changed

+3
-1
lines changed

2 files changed

+3
-1
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,6 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
170170

171171
Assert.notNull(producerFactory, "'producerFactory' cannot be null");
172172
this.autoFlush = autoFlush;
173-
this.transactional = producerFactory.transactionCapable();
174173
this.micrometerEnabled = KafkaUtils.MICROMETER_PRESENT;
175174
this.customProducerFactory = configOverrides != null && configOverrides.size() > 0;
176175
if (this.customProducerFactory) {
@@ -186,6 +185,7 @@ public KafkaTemplate(ProducerFactory<K, V> producerFactory, boolean autoFlush,
186185
else {
187186
this.producerFactory = producerFactory;
188187
}
188+
this.transactional = this.producerFactory.transactionCapable();
189189
}
190190

191191
@Override

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,12 +447,14 @@ void testConfigOverrides() {
447447
pf.setProducerPerThread(true);
448448
Map<String, Object> overrides = new HashMap<>();
449449
overrides.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
450+
overrides.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "TX");
450451
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf, true, overrides);
451452
assertThat(template.getProducerFactory().getConfigurationProperties()
452453
.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)).isEqualTo(StringSerializer.class);
453454
assertThat(template.getProducerFactory().getPhysicalCloseTimeout()).isEqualTo(Duration.ofSeconds(6));
454455
assertThat(template.getProducerFactory().isProducerPerConsumerPartition()).isFalse();
455456
assertThat(template.getProducerFactory().isProducerPerThread()).isTrue();
457+
assertThat(template.isTransactional()).isTrue();
456458
}
457459

458460
@Test

0 commit comments

Comments
 (0)