27
27
import com .google .cloud .spanner .Options .TransactionOption ;
28
28
import com .google .cloud .spanner .Options .UpdateOption ;
29
29
import com .google .cloud .spanner .SessionClient .SessionConsumer ;
30
+ import com .google .cloud .spanner .SessionPool .SessionPoolTransactionRunner ;
30
31
import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
31
32
import com .google .common .annotations .VisibleForTesting ;
32
33
import com .google .common .base .Preconditions ;
52
53
import java .util .concurrent .atomic .AtomicLong ;
53
54
import java .util .concurrent .atomic .AtomicReference ;
54
55
56
+ /**
57
+ * {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
58
+ * "Transaction type read_write not supported with multiplexed sessions" by switching from a
59
+ * multiplexed session to a regular session and then restarts the transaction.
60
+ */
61
+ class MultiplexedSessionTransactionRunner implements TransactionRunner {
62
+ private final SessionPool sessionPool ;
63
+ private final TransactionRunnerImpl transactionRunnerForMultiplexedSession ;
64
+ private SessionPoolTransactionRunner transactionRunnerForRegularSession ;
65
+ private final TransactionOption [] options ;
66
+ private boolean isUsingMultiplexedSession = true ;
67
+
68
+ public MultiplexedSessionTransactionRunner (
69
+ SessionImpl multiplexedSession , SessionPool sessionPool , TransactionOption ... options ) {
70
+ this .sessionPool = sessionPool ;
71
+ this .transactionRunnerForMultiplexedSession =
72
+ new TransactionRunnerImpl (
73
+ multiplexedSession , options ); // Uses multiplexed session initially
74
+ multiplexedSession .setActive (this .transactionRunnerForMultiplexedSession );
75
+ this .options = options ;
76
+ }
77
+
78
+ private TransactionRunner getRunner () {
79
+ if (this .isUsingMultiplexedSession ) {
80
+ return this .transactionRunnerForMultiplexedSession ;
81
+ } else {
82
+ if (this .transactionRunnerForRegularSession == null ) {
83
+ this .transactionRunnerForRegularSession =
84
+ new SessionPoolTransactionRunner <>(
85
+ sessionPool .getSession (),
86
+ sessionPool .getPooledSessionReplacementHandler (),
87
+ options );
88
+ }
89
+ return this .transactionRunnerForRegularSession ;
90
+ }
91
+ }
92
+
93
+ @ Override
94
+ public <T > T run (TransactionCallable <T > callable ) {
95
+ while (true ) {
96
+ try {
97
+ return getRunner ().run (callable );
98
+ } catch (SpannerException e ) {
99
+ if (e .getErrorCode () == ErrorCode .UNIMPLEMENTED
100
+ && verifyUnimplementedErrorMessageForRWMux (e )) {
101
+ this .isUsingMultiplexedSession = false ; // Fallback to regular session
102
+ } else {
103
+ throw e ; // Other errors propagate
104
+ }
105
+ }
106
+ }
107
+ }
108
+
109
+ @ Override
110
+ public Timestamp getCommitTimestamp () {
111
+ return getRunner ().getCommitTimestamp ();
112
+ }
113
+
114
+ @ Override
115
+ public CommitResponse getCommitResponse () {
116
+ return getRunner ().getCommitResponse ();
117
+ }
118
+
119
+ @ Override
120
+ public TransactionRunner allowNestedTransaction () {
121
+ getRunner ().allowNestedTransaction ();
122
+ return this ;
123
+ }
124
+
125
+ private boolean verifyUnimplementedErrorMessageForRWMux (SpannerException spannerException ) {
126
+ if (spannerException .getCause () == null ) {
127
+ return false ;
128
+ }
129
+ if (spannerException .getCause ().getMessage () == null ) {
130
+ return false ;
131
+ }
132
+ return spannerException
133
+ .getCause ()
134
+ .getMessage ()
135
+ .contains ("Transaction type read_write not supported with multiplexed sessions" );
136
+ }
137
+ }
138
+
55
139
/**
56
140
* {@link DatabaseClient} implementation that uses a single multiplexed session to execute
57
141
* transactions.
@@ -75,18 +159,30 @@ static class MultiplexedSessionTransaction extends SessionImpl {
75
159
private final int singleUseChannelHint ;
76
160
77
161
private boolean done ;
162
+ private final SessionPool pool ;
78
163
79
164
MultiplexedSessionTransaction (
80
165
MultiplexedSessionDatabaseClient client ,
81
166
ISpan span ,
82
167
SessionReference sessionReference ,
83
168
int singleUseChannelHint ,
84
169
boolean singleUse ) {
170
+ this (client , span , sessionReference , singleUseChannelHint , singleUse , null );
171
+ }
172
+
173
+ MultiplexedSessionTransaction (
174
+ MultiplexedSessionDatabaseClient client ,
175
+ ISpan span ,
176
+ SessionReference sessionReference ,
177
+ int singleUseChannelHint ,
178
+ boolean singleUse ,
179
+ SessionPool pool ) {
85
180
super (client .sessionClient .getSpanner (), sessionReference , singleUseChannelHint );
86
181
this .client = client ;
87
182
this .singleUse = singleUse ;
88
183
this .singleUseChannelHint = singleUseChannelHint ;
89
184
this .client .numSessionsAcquired .incrementAndGet ();
185
+ this .pool = pool ;
90
186
setCurrentSpan (span );
91
187
}
92
188
@@ -134,6 +230,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
134
230
return response ;
135
231
}
136
232
233
+ @ Override
234
+ public TransactionRunner readWriteTransaction (TransactionOption ... options ) {
235
+ return new MultiplexedSessionTransactionRunner (this , pool , options );
236
+ }
237
+
137
238
@ Override
138
239
void onTransactionDone () {
139
240
boolean markedDone = false ;
@@ -225,6 +326,8 @@ public void close() {
225
326
*/
226
327
@ VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean (false );
227
328
329
+ private SessionPool pool ;
330
+
228
331
MultiplexedSessionDatabaseClient (SessionClient sessionClient ) {
229
332
this (sessionClient , Clock .systemUTC ());
230
333
}
@@ -299,6 +402,10 @@ public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount
299
402
initialSessionReferenceFuture );
300
403
}
301
404
405
+ void setPool (SessionPool pool ) {
406
+ this .pool = pool ;
407
+ }
408
+
302
409
private static void maybeWaitForSessionCreation (
303
410
SessionPoolOptions sessionPoolOptions , ApiFuture <SessionReference > future ) {
304
411
Duration waitDuration = sessionPoolOptions .getWaitForMinSessions ();
@@ -489,7 +596,8 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
489
596
// any special handling of such errors.
490
597
multiplexedSessionReference .get ().get (),
491
598
singleUse ? getSingleUseChannelHint () : NO_CHANNEL_HINT ,
492
- singleUse );
599
+ singleUse ,
600
+ this .pool );
493
601
} catch (ExecutionException executionException ) {
494
602
throw SpannerExceptionFactory .asSpannerException (executionException .getCause ());
495
603
} catch (InterruptedException interruptedException ) {
@@ -499,7 +607,7 @@ private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
499
607
500
608
private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction () {
501
609
return new DelayedMultiplexedSessionTransaction (
502
- this , tracer .getCurrentSpan (), multiplexedSessionReference .get ());
610
+ this , tracer .getCurrentSpan (), multiplexedSessionReference .get (), this . pool );
503
611
}
504
612
505
613
private int getSingleUseChannelHint () {
0 commit comments