Skip to content

Commit 550e6b1

Browse files
authored
GH-2981: Suppress abortTransaction exception
Fixes: #2981 When `producer.abortTransaction()` fails, the original exception is lost in `KafkaTemplate`. * Catch an exception on `producer.abortTransaction()` and `ex.addSuppressed(abortException)` **Cherry-pick to `3.0.x`**
1 parent 8cffe22 commit 550e6b1

File tree

2 files changed

+34
-4
lines changed

2 files changed

+34
-4
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -657,9 +657,14 @@ public <T> T executeInTransaction(OperationsCallback<K, V, T> callback) {
657657
catch (SkipAbortException e) { // NOSONAR - exception flow control
658658
throw ((RuntimeException) e.getCause()); // NOSONAR - lost stack trace
659659
}
660-
catch (Exception e) {
661-
producer.abortTransaction();
662-
throw e;
660+
catch (Exception ex) {
661+
try {
662+
producer.abortTransaction();
663+
}
664+
catch (Exception abortException) {
665+
ex.addSuppressed(abortException);
666+
}
667+
throw ex;
663668
}
664669
finally {
665670
this.producers.remove(currentThread);

spring-kafka/src/test/java/org/springframework/kafka/core/KafkaTemplateTransactionTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2017-2023 the original author or authors.
2+
* Copyright 2017-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -506,6 +506,31 @@ public void testAbort() {
506506
verify(producer, never()).commitTransaction();
507507
}
508508

509+
@Test
510+
public void abortFiledOriginalExceptionRethrown() {
511+
MockProducer<String, String> producer = spy(new MockProducer<>());
512+
producer.initTransactions();
513+
producer.abortTransactionException = new RuntimeException("abort failed");
514+
515+
ProducerFactory<String, String> pf = new MockProducerFactory<>((tx, id) -> producer, null);
516+
517+
KafkaTemplate<String, String> template = new KafkaTemplate<>(pf);
518+
template.setDefaultTopic(STRING_KEY_TOPIC);
519+
520+
assertThatExceptionOfType(RuntimeException.class)
521+
.isThrownBy(() ->
522+
template.executeInTransaction(t -> {
523+
throw new RuntimeException("intentional");
524+
}))
525+
.withMessage("intentional")
526+
.withStackTraceContaining("abort failed");
527+
528+
assertThat(producer.transactionCommitted()).isFalse();
529+
assertThat(producer.transactionAborted()).isFalse();
530+
assertThat(producer.closed()).isTrue();
531+
verify(producer, never()).commitTransaction();
532+
}
533+
509534
@Test
510535
public void testExecuteInTransactionNewInnerTx() {
511536
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)