Skip to content

Commit 8304653

Browse files
mp911dejhoeller
authored andcommitted
Replace signal materialization in TransactionalOperator with usingWhen
We now use Flux.usingWhen() instead materialize/dematerialize operators to reuse Reactor's resource closure. Until usingWhen() accepts a BiFunction to consume error signals, we need to map error signals outside of usingWhen which requires re-wrapping of the ReactiveTransaction object.
1 parent 9cff07c commit 8304653

File tree

2 files changed

+7
-10
lines changed

2 files changed

+7
-10
lines changed

spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -78,14 +78,11 @@ public <T> Flux<T> execute(TransactionCallback<T> action) throws TransactionExce
7878
return status.flatMapMany(it -> {
7979
// This is an around advice: Invoke the next interceptor in the chain.
8080
// This will normally result in a target object being invoked.
81-
Flux<Object> retVal = Flux.from(action.doInTransaction(it));
82-
return retVal.onErrorResume(ex -> rollbackOnException(it, ex).
83-
then(Mono.error(ex))).materialize().flatMap(signal -> {
84-
if (signal.isOnComplete()) {
85-
return this.transactionManager.commit(it).materialize();
86-
}
87-
return Mono.just(signal);
88-
}).<T>dematerialize();
81+
// Need re-wrapping of ReactiveTransaction until we get hold of the exception
82+
// through usingWhen.
83+
return Flux.usingWhen(Mono.just(it), action::doInTransaction,
84+
this.transactionManager::commit, s -> Mono.empty())
85+
.onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)));
8986
});
9087
})
9188
.subscriberContext(TransactionContextManager.getOrCreateContext())

spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public void rollbackWithMono() {
5959
@Test
6060
public void commitWithFlux() {
6161
TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());
62-
Flux.just(true).as(operator::transactional)
62+
Flux.just(1, 2, 3, 4).as(operator::transactional)
6363
.as(StepVerifier::create)
64-
.expectNext(true)
64+
.expectNextCount(4)
6565
.verifyComplete();
6666
assertTrue(tm.commit);
6767
assertFalse(tm.rollback);

0 commit comments

Comments
 (0)