Skip to content

Commit 1d80cbe

Browse files
mp911dejhoeller
authored andcommitted
Replace signal materialization in TransactionAspectSupport with usingWhen
We now use Flux.usingWhen() instead materialize/dematerialize operators to reuse Reactor's resource closure. Until usingWhen() accepts a BiFunction to consume error signals, we need to map error signals outside of usingWhen which requires re-wrapping of the ReactiveTransaction object. Also, reuse the current TransactionContext to leave Transaction creation/propagation entirely to ReactiveTransactionManager instead of creating new TransactionContexts.
1 parent 28c5d7b commit 1d80cbe

File tree

3 files changed

+32
-20
lines changed

3 files changed

+32
-20
lines changed

spring-tx/src/main/java/org/springframework/transaction/interceptor/TransactionAspectSupport.java

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -834,15 +834,18 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
834834
try {
835835
// This is an around advice: Invoke the next interceptor in the chain.
836836
// This will normally result in a target object being invoked.
837-
Mono<Object> retVal = (Mono) invocation.proceedWithInvocation();
838-
return retVal
839-
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex))).materialize()
840-
.flatMap(signal -> {
841-
if (signal.isOnComplete() || signal.isOnNext()) {
842-
return commitTransactionAfterReturning(it).thenReturn(signal);
843-
}
844-
return Mono.just(signal);
845-
}).dematerialize();
837+
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
838+
// through usingWhen.
839+
return Mono.<Object, ReactiveTransactionInfo>usingWhen(Mono.just(it), s -> {
840+
try {
841+
return (Mono) invocation.proceedWithInvocation();
842+
}
843+
catch (Throwable throwable) {
844+
return Mono.error(throwable);
845+
}
846+
}, this::commitTransactionAfterReturning, s -> Mono.empty())
847+
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex)
848+
.then(Mono.error(ex)));
846849
}
847850
catch (Throwable ex) {
848851
// target invocation exception
@@ -860,15 +863,19 @@ public Object invokeWithinTransaction(Method method, @Nullable Class<?> targetCl
860863
try {
861864
// This is an around advice: Invoke the next interceptor in the chain.
862865
// This will normally result in a target object being invoked.
863-
Flux<Object> retVal = Flux.from(this.adapter.toPublisher(invocation.proceedWithInvocation()));
864-
return retVal
865-
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex).then(Mono.error(ex)))
866-
.materialize().flatMap(signal -> {
867-
if (signal.isOnComplete()) {
868-
return commitTransactionAfterReturning(it).materialize();
869-
}
870-
return Mono.just(signal);
871-
}).dematerialize();
866+
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
867+
// through usingWhen.
868+
return Flux.usingWhen(Mono.just(it), s -> {
869+
try {
870+
return this.adapter.toPublisher(
871+
invocation.proceedWithInvocation());
872+
}
873+
catch (Throwable throwable) {
874+
return Mono.error(throwable);
875+
}
876+
}, this::commitTransactionAfterReturning, s -> Mono.empty())
877+
.onErrorResume(ex -> completeTransactionAfterThrowing(it, ex)
878+
.then(Mono.error(ex)));
872879
}
873880
catch (Throwable ex) {
874881
// target invocation exception

spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionContextManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public static Function<Context, Context> getOrCreateContext() {
8888
return context -> {
8989
TransactionContextHolder holder = context.get(TransactionContextHolder.class);
9090
if (holder.hasContext()) {
91-
context.put(TransactionContext.class, holder.currentContext());
91+
return context.put(TransactionContext.class, holder.currentContext());
9292
}
9393
return context.put(TransactionContext.class, holder.createContext());
9494
};

spring-tx/src/test/java/org/springframework/transaction/interceptor/AbstractReactiveTransactionAspectTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import static org.assertj.core.api.Assertions.*;
3535
import static org.assertj.core.api.Fail.fail;
36+
import static org.junit.Assert.*;
3637
import static org.mockito.Mockito.*;
3738

3839
/**
@@ -321,6 +322,7 @@ public void cannotCommitTransaction() throws Exception {
321322
when(rtm.getReactiveTransaction(txatt)).thenReturn(Mono.just(status));
322323
UnexpectedRollbackException ex = new UnexpectedRollbackException("foobar", null);
323324
when(rtm.commit(status)).thenReturn(Mono.error(ex));
325+
when(rtm.rollback(status)).thenReturn(Mono.empty());
324326

325327
DefaultTestBean tb = new DefaultTestBean();
326328
TestBean itb = (TestBean) advised(tb, rtm, tas);
@@ -329,7 +331,10 @@ public void cannotCommitTransaction() throws Exception {
329331

330332
Mono.from(itb.setName(name))
331333
.as(StepVerifier::create)
332-
.expectError(UnexpectedRollbackException.class)
334+
.consumeErrorWith(throwable -> {
335+
assertEquals(RuntimeException.class, throwable.getClass());
336+
assertEquals(ex, throwable.getCause());
337+
})
333338
.verify();
334339

335340
// Should have invoked target and changed name

0 commit comments

Comments
 (0)