24
24
import io .r2dbc .spi .Option ;
25
25
import io .r2dbc .spi .R2dbcException ;
26
26
import io .r2dbc .spi .Result ;
27
- import org .reactivestreams .Publisher ;
28
27
import reactor .core .publisher .Mono ;
29
28
30
29
import org .springframework .beans .factory .InitializingBean ;
38
37
import org .springframework .util .Assert ;
39
38
40
39
/**
41
- * {@link org.springframework.transaction.ReactiveTransactionManager}
42
- * implementation for a single R2DBC {@link ConnectionFactory}. This class is
43
- * capable of working in any environment with any R2DBC driver, as long as the
44
- * setup uses a {@code ConnectionFactory} as its {@link Connection} factory
45
- * mechanism. Binds a R2DBC {@code Connection} from the specified
46
- * {@code ConnectionFactory} to the current subscriber context, potentially
47
- * allowing for one context-bound {@code Connection} per {@code ConnectionFactory}.
40
+ * {@link org.springframework.transaction.ReactiveTransactionManager} implementation
41
+ * for a single R2DBC {@link ConnectionFactory}. This class is capable of working
42
+ * in any environment with any R2DBC driver, as long as the setup uses a
43
+ * {@code ConnectionFactory} as its {@link Connection} factory mechanism.
44
+ * Binds a R2DBC {@code Connection} from the specified {@code ConnectionFactory}
45
+ * to the current subscriber context, potentially allowing for one context-bound
46
+ * {@code Connection} per {@code ConnectionFactory}.
48
47
*
49
- * <p><b>Note: The {@code ConnectionFactory} that this transaction manager
50
- * operates on needs to return independent {@code Connection}s.</b>
51
- * The {@code Connection}s may come from a pool (the typical case), but the
52
- * {@code ConnectionFactory} must not return scoped {@code Connection}s
53
- * or the like. This transaction manager will associate {@code Connection}
54
- * with context-bound transactions itself, according to the specified propagation
55
- * behavior. It assumes that a separate, independent {@code Connection} can
56
- * be obtained even during an ongoing transaction.
48
+ * <p><b>Note: The {@code ConnectionFactory} that this transaction manager operates
49
+ * on needs to return independent {@code Connection}s.</b> The {@code Connection}s
50
+ * typically come from a connection pool but the {@code ConnectionFactory} must not
51
+ * return specifically scoped or constrained {@code Connection}s. This transaction
52
+ * manager will associate {@code Connection} with context-bound transactions,
53
+ * according to the specified propagation behavior. It assumes that a separate,
54
+ * independent {@code Connection} can be obtained even during an ongoing transaction.
57
55
*
58
56
* <p>Application code is required to retrieve the R2DBC Connection via
59
57
* {@link ConnectionFactoryUtils#getConnection(ConnectionFactory)}
60
58
* instead of a standard R2DBC-style {@link ConnectionFactory#create()} call.
61
59
* Spring classes such as {@code DatabaseClient} use this strategy implicitly.
62
60
* If not used in combination with this transaction manager, the
63
- * {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the
64
- * native {@code ConnectionFactory} lookup; it can thus be used in a portable fashion.
61
+ * {@link ConnectionFactoryUtils} lookup strategy behaves exactly like the native
62
+ * {@code ConnectionFactory} lookup; it can thus be used in a portable fashion.
65
63
*
66
- * <p>Alternatively, you can allow application code to work with the standard
67
- * R2DBC lookup pattern {@link ConnectionFactory#create()}, for example for code
68
- * that is not aware of Spring at all. In that case, define a
69
- * {@link TransactionAwareConnectionFactoryProxy} for your target {@code ConnectionFactory},
70
- * and pass that proxy {@code ConnectionFactory} to your DAOs, which will automatically
71
- * participate in Spring-managed transactions when accessing it.
64
+ * <p>Alternatively, you can allow application code to work with the lookup pattern
65
+ * {@link ConnectionFactory#create()}, for example for code not aware of Spring.
66
+ * In that case, define a {@link TransactionAwareConnectionFactoryProxy} for your
67
+ * target {@code ConnectionFactory}, and pass that proxy {@code ConnectionFactory}
68
+ * to your DAOs which will automatically participate in Spring-managed transactions
69
+ * when accessing it.
72
70
*
73
- * <p>This transaction manager triggers flush callbacks on registered transaction
74
- * synchronizations (if synchronization is generally active), assuming resources
75
- * operating on the underlying R2DBC {@code Connection}.
76
- *
77
- * <p>Spring's {@code TransactionDefinition} attributes are carried forward to R2DBC drivers
78
- * using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}. Subclasses may
79
- * override {@link #createTransactionDefinition(TransactionDefinition)} to customize
80
- * transaction definitions for vendor-specific attributes.
71
+ * <p>Spring's {@code TransactionDefinition} attributes are carried forward to
72
+ * R2DBC drivers using extensible R2DBC {@link io.r2dbc.spi.TransactionDefinition}.
73
+ * Subclasses may override {@link #createTransactionDefinition(TransactionDefinition)}
74
+ * to customize transaction definitions for vendor-specific attributes. As of 6.0.10,
75
+ * this transaction manager supports nested transactions via R2DBC savepoints as well.
81
76
*
82
77
* @author Mark Paluch
83
78
* @author Juergen Hoeller
@@ -97,7 +92,7 @@ public class R2dbcTransactionManager extends AbstractReactiveTransactionManager
97
92
98
93
/**
99
94
* Create a new {@code R2dbcTransactionManager} instance.
100
- * A ConnectionFactory has to be set to be able to use it.
95
+ * A {@code ConnectionFactory} has to be set to be able to use it.
101
96
* @see #setConnectionFactory
102
97
*/
103
98
public R2dbcTransactionManager () {}
@@ -114,12 +109,13 @@ public R2dbcTransactionManager(ConnectionFactory connectionFactory) {
114
109
115
110
116
111
/**
117
- * Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions for.
118
- * <p>This will typically be a locally defined {@code ConnectionFactory}, for example a connection pool.
119
- * <p><b>The {@code ConnectionFactory} passed in here needs to return independent {@link Connection}s.</b>
120
- * The {@code Connection}s may come from a pool (the typical case), but the {@code ConnectionFactory}
121
- * must not return scoped {@code Connection}s or the like.
122
- * @see TransactionAwareConnectionFactoryProxy
112
+ * Set the R2DBC {@link ConnectionFactory} that this instance should manage transactions
113
+ * for. This will typically be a locally defined {@code ConnectionFactory}, for example
114
+ * an R2DBC connection pool.
115
+ * <p><b>The {@code ConnectionFactory} passed in here needs to return independent
116
+ * {@link Connection}s.</b> The {@code Connection}s typically come from a connection
117
+ * pool but the {@code ConnectionFactory} must not return specifically scoped or
118
+ * constrained {@code Connection}s.
123
119
*/
124
120
public void setConnectionFactory (@ Nullable ConnectionFactory connectionFactory ) {
125
121
this .connectionFactory = connectionFactory ;
@@ -183,8 +179,7 @@ protected Object doGetTransaction(TransactionSynchronizationManager synchronizat
183
179
184
180
@ Override
185
181
protected boolean isExistingTransaction (Object transaction ) {
186
- ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject ) transaction ;
187
- return (txObject .hasConnectionHolder () && txObject .getConnectionHolder ().isTransactionActive ());
182
+ return ((ConnectionFactoryTransactionObject ) transaction ).isTransactionActive ();
188
183
}
189
184
190
185
@ Override
@@ -193,6 +188,11 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
193
188
194
189
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject ) transaction ;
195
190
191
+ if (definition .getPropagationBehavior () == TransactionDefinition .PROPAGATION_NESTED &&
192
+ txObject .isTransactionActive ()) {
193
+ return txObject .createSavepoint ();
194
+ }
195
+
196
196
return Mono .defer (() -> {
197
197
Mono <Connection > connectionMono ;
198
198
@@ -210,7 +210,7 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
210
210
connectionMono = Mono .just (txObject .getConnectionHolder ().getConnection ());
211
211
}
212
212
213
- return connectionMono .flatMap (con -> Mono . from ( doBegin (definition , con ) )
213
+ return connectionMono .flatMap (con -> doBegin (definition , con )
214
214
.then (prepareTransactionalConnection (con , definition ))
215
215
.doOnSuccess (v -> {
216
216
txObject .getConnectionHolder ().setTransactionActive (true );
@@ -234,12 +234,12 @@ protected Mono<Void> doBegin(TransactionSynchronizationManager synchronizationMa
234
234
}).then ();
235
235
}
236
236
237
- private Publisher <Void > doBegin (TransactionDefinition definition , Connection con ) {
237
+ private Mono <Void > doBegin (TransactionDefinition definition , Connection con ) {
238
238
io .r2dbc .spi .TransactionDefinition transactionDefinition = createTransactionDefinition (definition );
239
239
if (logger .isDebugEnabled ()) {
240
240
logger .debug ("Starting R2DBC transaction on Connection [" + con + "] using [" + transactionDefinition + "]" );
241
241
}
242
- return con .beginTransaction (transactionDefinition );
242
+ return Mono . from ( con .beginTransaction (transactionDefinition ) );
243
243
}
244
244
245
245
/**
@@ -300,25 +300,23 @@ protected Mono<Void> doCommit(TransactionSynchronizationManager TransactionSynch
300
300
GenericReactiveTransaction status ) throws TransactionException {
301
301
302
302
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject ) status .getTransaction ();
303
- Connection connection = txObject .getConnectionHolder ().getConnection ();
304
303
if (status .isDebug ()) {
305
- logger .debug ("Committing R2DBC transaction on Connection [" + connection + "]" );
304
+ logger .debug ("Committing R2DBC transaction on Connection [" +
305
+ txObject .getConnectionHolder ().getConnection () + "]" );
306
306
}
307
- return Mono .from (connection .commitTransaction ())
308
- .onErrorMap (R2dbcException .class , ex -> translateException ("R2DBC commit" , ex ));
307
+ return txObject .commit ().onErrorMap (R2dbcException .class , ex -> translateException ("R2DBC commit" , ex ));
309
308
}
310
309
311
310
@ Override
312
311
protected Mono <Void > doRollback (TransactionSynchronizationManager TransactionSynchronizationManager ,
313
312
GenericReactiveTransaction status ) throws TransactionException {
314
313
315
314
ConnectionFactoryTransactionObject txObject = (ConnectionFactoryTransactionObject ) status .getTransaction ();
316
- Connection connection = txObject .getConnectionHolder ().getConnection ();
317
315
if (status .isDebug ()) {
318
- logger .debug ("Rolling back R2DBC transaction on Connection [" + connection + "]" );
316
+ logger .debug ("Rolling back R2DBC transaction on Connection [" +
317
+ txObject .getConnectionHolder ().getConnection () + "]" );
319
318
}
320
- return Mono .from (connection .rollbackTransaction ())
321
- .onErrorMap (R2dbcException .class , ex -> translateException ("R2DBC rollback" , ex ));
319
+ return txObject .rollback ().onErrorMap (R2dbcException .class , ex -> translateException ("R2DBC rollback" , ex ));
322
320
}
323
321
324
322
@ Override
@@ -496,6 +494,9 @@ private static class ConnectionFactoryTransactionObject {
496
494
497
495
private boolean newConnectionHolder ;
498
496
497
+ @ Nullable
498
+ private String savepointName ;
499
+
499
500
void setConnectionHolder (@ Nullable ConnectionHolder connectionHolder , boolean newConnectionHolder ) {
500
501
setConnectionHolder (connectionHolder );
501
502
this .newConnectionHolder = newConnectionHolder ;
@@ -505,10 +506,6 @@ boolean isNewConnectionHolder() {
505
506
return this .newConnectionHolder ;
506
507
}
507
508
508
- void setRollbackOnly () {
509
- getConnectionHolder ().setRollbackOnly ();
510
- }
511
-
512
509
public void setConnectionHolder (@ Nullable ConnectionHolder connectionHolder ) {
513
510
this .connectionHolder = connectionHolder ;
514
511
}
@@ -521,6 +518,34 @@ public ConnectionHolder getConnectionHolder() {
521
518
public boolean hasConnectionHolder () {
522
519
return (this .connectionHolder != null );
523
520
}
521
+
522
+ public boolean isTransactionActive () {
523
+ return (this .connectionHolder != null && this .connectionHolder .isTransactionActive ());
524
+ }
525
+
526
+ public Mono <Void > createSavepoint () {
527
+ ConnectionHolder holder = getConnectionHolder ();
528
+ this .savepointName = holder .nextSavepoint ();
529
+ return Mono .from (holder .getConnection ().createSavepoint (this .savepointName ));
530
+ }
531
+
532
+ public Mono <Void > commit () {
533
+ Connection connection = getConnectionHolder ().getConnection ();
534
+ return (this .savepointName != null ?
535
+ Mono .from (connection .releaseSavepoint (this .savepointName )) :
536
+ Mono .from (connection .commitTransaction ()));
537
+ }
538
+
539
+ public Mono <Void > rollback () {
540
+ Connection connection = getConnectionHolder ().getConnection ();
541
+ return (this .savepointName != null ?
542
+ Mono .from (connection .rollbackTransactionToSavepoint (this .savepointName )) :
543
+ Mono .from (connection .rollbackTransaction ()));
544
+ }
545
+
546
+ public void setRollbackOnly () {
547
+ getConnectionHolder ().setRollbackOnly ();
548
+ }
524
549
}
525
550
526
551
}
0 commit comments