Skip to content

Commit b9c9abd

Browse files
authored
GH-2852: API for customizing transactionIdSuffix
Fixes: #2852 * Provide a way to customize the transactionIdSuffix by providing a new strategy API - `TransactionIdSuffixStrategy` * PR adds a default implementation for this new API * Related changes in classes that deal with transactions
1 parent 02205ea commit b9c9abd

File tree

11 files changed

+723
-40
lines changed

11 files changed

+723
-40
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/transactions.adoc

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,43 @@ NOTE: If there is a `KafkaTransactionManager` (or synchronized) transaction in p
102102
Instead, a new "nested" transaction is used.
103103

104104
[[transaction-id-prefix]]
105-
== `transactionIdPrefix`
105+
== `TransactionIdPrefix`
106106

107107
With `EOSMode.V2` (aka `BETA`), the only supported mode, it is no longer necessary to use the same `transactional.id`, even for consumer-initiated transactions; in fact, it must be unique on each instance the same as for producer-initiated transactions.
108108
This property must have a different value on each application instance.
109109

110+
[[transaction-id-suffix-fixed]]
111+
== `TransactionIdSuffix Fixed`
112+
113+
Since 3.2, a new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
114+
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
115+
When a transaction producer is requested and `transactional.id` all in use, throw a `NoProducerAvailableException`.
116+
User can then use a `RetryTemplate` configured to retry that exception, with a suitably configured back off.
117+
118+
[source,java]
119+
----
120+
public static class Config {
121+
122+
@Bean
123+
public ProducerFactory<String, String> myProducerFactory() {
124+
Map<String, Object> configs = producerConfigs();
125+
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
126+
...
127+
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
128+
...
129+
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
130+
pf.setTransactionIdSuffixStrategy(ss);
131+
return pf;
132+
}
133+
134+
}
135+
----
136+
When setting `maxCache` to 5, `transactional.id` is `my.txid.`++`{0-4}`+.
137+
138+
IMPORTANT: When using `KafkaTransactionManager` with the `ConcurrentMessageListenerContainer` and enabling `maxCache`, it is necessary to set `maxCache` to a value greater than or equal to `concurrency`.
139+
If a `MessageListenerContainer` is unable to acquire a `transactional.id` suffix, it will throw a `NoProducerAvailableException`.
140+
When using nested transactions in the `ConcurrentMessageListenerContainer`, it is necessary to adjust the maxCache setting to handle the increased number of nested transactions.
141+
110142
[[tx-template-mixed]]
111143
== `KafkaTemplate` Transactional and non-Transactional Publishing
112144

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,9 @@
77
This section covers the changes made from version 3.1 to version 3.2.
88
For changes in earlier version, see xref:appendix/change-history.adoc[Change History].
99

10+
[[x32-tiss]]
11+
=== TransactionIdSuffixStrategy
12+
13+
A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
14+
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
15+
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 67 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2023 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -121,8 +121,6 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
121121

122122
private final Map<String, Object> configs;
123123

124-
private final AtomicInteger transactionIdSuffix = new AtomicInteger();
125-
126124
private final Map<String, BlockingQueue<CloseSafeProducer<K, V>>> cache = new ConcurrentHashMap<>();
127125

128126
private final Map<Thread, CloseSafeProducer<K, V>> threadBoundProducers = new ConcurrentHashMap<>();
@@ -137,6 +135,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
137135

138136
private final AtomicBoolean running = new AtomicBoolean();
139137

138+
private TransactionIdSuffixStrategy transactionIdSuffixStrategy = new DefaultTransactionIdSuffixStrategy(0);
139+
140140
private Supplier<Serializer<K>> keySerializerSupplier;
141141

142142
private Supplier<Serializer<V>> valueSerializerSupplier;
@@ -350,6 +350,16 @@ public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSu
350350
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
351351
}
352352

353+
/**
354+
* Set the transaction suffix strategy.
355+
* @param transactionIdSuffixStrategy the strategy.
356+
* @since 3.2
357+
*/
358+
public void setTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy) {
359+
Assert.notNull(transactionIdSuffixStrategy, "'transactionIdSuffixStrategy' cannot be null");
360+
this.transactionIdSuffixStrategy = transactionIdSuffixStrategy;
361+
}
362+
353363
/**
354364
* If true (default), programmatically provided serializers (via constructor or
355365
* setters) will be configured using the producer configuration. Set to false if the
@@ -404,7 +414,7 @@ public Duration getPhysicalCloseTimeout() {
404414

405415
/**
406416
* Set a prefix for the {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} config. By
407-
* default a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
417+
* default, a {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} value from configs is used
408418
* as a prefix in the target producer configs.
409419
* @param transactionIdPrefix the prefix.
410420
* @since 1.3
@@ -536,9 +546,9 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
536546
producerProperties.putAll(overrideProperties);
537547
producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties);
538548
DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(producerProperties,
539-
getKeySerializerSupplier(),
540-
getValueSerializerSupplier(),
541-
isConfigureSerializers());
549+
getKeySerializerSupplier(),
550+
getValueSerializerSupplier(),
551+
isConfigureSerializers());
542552
newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds());
543553
newFactory.setProducerPerThread(isProducerPerThread());
544554
for (ProducerPostProcessor<K, V> templatePostProcessor : getPostProcessors()) {
@@ -690,7 +700,7 @@ public void destroy() {
690700
}
691701
if (producerToClose != null) {
692702
try {
693-
producerToClose.closeDelegate(this.physicalCloseTimeout, this.listeners);
703+
producerToClose.closeDelegate(this.physicalCloseTimeout);
694704
}
695705
catch (Exception e) {
696706
LOGGER.error(e, "Exception while closing producer");
@@ -700,7 +710,7 @@ public void destroy() {
700710
CloseSafeProducer<K, V> next = queue.poll();
701711
while (next != null) {
702712
try {
703-
next.closeDelegate(this.physicalCloseTimeout, this.listeners);
713+
next.closeDelegate(this.physicalCloseTimeout);
704714
}
705715
catch (Exception e) {
706716
LOGGER.error(e, "Exception while closing producer");
@@ -711,7 +721,7 @@ public void destroy() {
711721
this.cache.clear();
712722
this.threadBoundProducers.values().forEach(prod -> {
713723
try {
714-
prod.closeDelegate(this.physicalCloseTimeout, this.listeners);
724+
prod.closeDelegate(this.physicalCloseTimeout);
715725
}
716726
catch (Exception e) {
717727
LOGGER.error(e, "Exception while closing producer");
@@ -769,7 +779,7 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
769779
this.globalLock.lock();
770780
try {
771781
if (this.producer != null && this.producer.closed) {
772-
this.producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
782+
this.producer.closeDelegate(this.physicalCloseTimeout);
773783
this.producer = null;
774784
}
775785
if (this.producer != null && expire(this.producer)) {
@@ -820,6 +830,9 @@ protected Producer<K, V> createKafkaProducer() {
820830
* @since 2.2.13
821831
*/
822832
protected final boolean removeProducer(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
833+
if (producerToRemove.closed) {
834+
this.listeners.forEach(listener -> listener.producerRemoved(producerToRemove.clientId, producerToRemove));
835+
}
823836
return producerToRemove.closed;
824837
}
825838

@@ -846,7 +859,8 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
846859
}
847860
}
848861
if (cachedProducer == null) {
849-
return doCreateTxProducer(txIdPrefix, "" + this.transactionIdSuffix.getAndIncrement(), this::cacheReturner);
862+
String suffix = this.transactionIdSuffixStrategy.acquireSuffix(txIdPrefix);
863+
return doCreateTxProducer(txIdPrefix, suffix, this::cacheReturner);
850864
}
851865
else {
852866
return cachedProducer;
@@ -856,24 +870,28 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
856870
private boolean expire(CloseSafeProducer<K, V> producer) {
857871
boolean expired = this.maxAge > 0 && System.currentTimeMillis() - producer.created > this.maxAge;
858872
if (expired) {
859-
producer.closeDelegate(this.physicalCloseTimeout, this.listeners);
873+
producer.closeDelegate(this.physicalCloseTimeout);
860874
}
861875
return expired;
862876
}
863877

864878
boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout) {
865879
if (producerToRemove.closed) {
866-
producerToRemove.closeDelegate(timeout, this.listeners);
880+
this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
867881
return true;
868882
}
869883
else {
870884
this.globalLock.lock();
871885
try {
886+
if (producerToRemove.epoch != this.epoch.get()) {
887+
this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
888+
return true;
889+
}
872890
BlockingQueue<CloseSafeProducer<K, V>> txIdCache = getCache(producerToRemove.txIdPrefix);
873891
if (producerToRemove.epoch != this.epoch.get()
874892
|| (txIdCache != null && !txIdCache.contains(producerToRemove)
875893
&& !txIdCache.offer(producerToRemove))) {
876-
producerToRemove.closeDelegate(timeout, this.listeners);
894+
this.removeTransactionProducer(producerToRemove, timeout, this.listeners);
877895
return true;
878896
}
879897
}
@@ -884,6 +902,12 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
884902
}
885903
}
886904

905+
private void removeTransactionProducer(CloseSafeProducer<K, V> producer, Duration timeout,
906+
List<Listener<K, V>> listeners) {
907+
this.transactionIdSuffixStrategy.releaseSuffix(producer.txIdPrefix, producer.txIdSuffix);
908+
listeners.forEach(listener -> listener.producerRemoved(producer.clientId, producer));
909+
}
910+
887911
private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
888912
BiPredicate<CloseSafeProducer<K, V>, Duration> remover) {
889913
Producer<K, V> newProducer = createRawProducer(getTxProducerConfigs(prefix + suffix));
@@ -899,10 +923,13 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
899923
newEx.addSuppressed(ex2);
900924
throw newEx; // NOSONAR - lost stack trace
901925
}
926+
finally {
927+
this.transactionIdSuffixStrategy.releaseSuffix(prefix, suffix);
928+
}
902929
throw new KafkaException("initTransactions() failed", ex);
903930
}
904931
CloseSafeProducer<K, V> closeSafeProducer =
905-
new CloseSafeProducer<>(newProducer, remover, prefix, this.physicalCloseTimeout, this.beanName,
932+
new CloseSafeProducer<>(newProducer, remover, prefix, suffix, this.physicalCloseTimeout, this.beanName,
906933
this.epoch.get());
907934
this.listeners.forEach(listener -> listener.producerAdded(closeSafeProducer.clientId, closeSafeProducer));
908935
return closeSafeProducer;
@@ -923,7 +950,7 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache() {
923950
}
924951

925952
@Nullable
926-
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
953+
protected BlockingQueue<CloseSafeProducer<K, V>> getCache(@Nullable String txIdPrefix) {
927954
if (txIdPrefix == null) {
928955
return null;
929956
}
@@ -941,7 +968,7 @@ protected BlockingQueue<CloseSafeProducer<K, V>> getCache(String txIdPrefix) {
941968
public void closeThreadBoundProducer() {
942969
CloseSafeProducer<K, V> tlProducer = this.threadBoundProducers.remove(Thread.currentThread());
943970
if (tlProducer != null) {
944-
tlProducer.closeDelegate(this.physicalCloseTimeout, this.listeners);
971+
tlProducer.closeDelegate(this.physicalCloseTimeout);
945972
}
946973
}
947974

@@ -991,6 +1018,8 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
9911018

9921019
final String txIdPrefix; // NOSONAR
9931020

1021+
final String txIdSuffix; // NOSONAR
1022+
9941023
final long created; // NOSONAR
9951024

9961025
private final Duration closeTimeout;
@@ -1010,14 +1039,21 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
10101039
this(delegate, removeConsumerProducer, null, closeTimeout, factoryName, epoch);
10111040
}
10121041

1042+
CloseSafeProducer(Producer<K, V> delegate, BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer,
1043+
@Nullable String txIdPrefix, Duration closeTimeout, String factoryName, int epoch) {
1044+
1045+
this(delegate, removeProducer, txIdPrefix, null, closeTimeout, factoryName, epoch);
1046+
}
1047+
10131048
CloseSafeProducer(Producer<K, V> delegate,
10141049
BiPredicate<CloseSafeProducer<K, V>, Duration> removeProducer, @Nullable String txIdPrefix,
1015-
Duration closeTimeout, String factoryName, int epoch) {
1050+
@Nullable String txIdSuffix, Duration closeTimeout, String factoryName, int epoch) {
10161051

10171052
Assert.isTrue(!(delegate instanceof CloseSafeProducer), "Cannot double-wrap a producer");
10181053
this.delegate = delegate;
10191054
this.removeProducer = removeProducer;
10201055
this.txIdPrefix = txIdPrefix;
1056+
this.txIdSuffix = txIdSuffix;
10211057
this.closeTimeout = closeTimeout;
10221058
Map<MetricName, ? extends Metric> metrics = delegate.metrics();
10231059
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
@@ -1057,7 +1093,6 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
10571093
}
10581094
callback.onCompletion(metadata, exception);
10591095
}
1060-
10611096
});
10621097
}
10631098

@@ -1159,22 +1194,30 @@ public void close(@Nullable Duration timeout) {
11591194
this.removeProducer.test(this, this.producerFailed instanceof TimeoutException
11601195
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
11611196
: timeout);
1197+
this.delegate.close(timeout == null ? this.closeTimeout : this.producerFailed instanceof TimeoutException
1198+
? CLOSE_TIMEOUT_AFTER_TX_TIMEOUT
1199+
: timeout);
11621200
}
11631201
else {
11641202
this.closed = this.removeProducer.test(this, timeout);
1203+
if (this.closed) {
1204+
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
1205+
}
11651206
}
11661207
}
11671208
}
11681209

1169-
void closeDelegate(Duration timeout, List<Listener<K, V>> listeners) {
1210+
void closeDelegate(Duration timeout) {
11701211
try {
1171-
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
1212+
if (!this.closed) {
1213+
this.delegate.close(timeout == null ? this.closeTimeout : timeout);
1214+
this.closed = true;
1215+
this.removeProducer.test(this, timeout == null ? this.closeTimeout : timeout);
1216+
}
11721217
}
11731218
catch (Exception ex) {
11741219
LOGGER.warn(ex, () -> "Failed to close " + this.delegate);
11751220
}
1176-
listeners.forEach(listener -> listener.producerRemoved(this.clientId, this));
1177-
this.closed = true;
11781221
}
11791222

11801223
@Override

0 commit comments

Comments
 (0)