Skip to content

Commit b3e2b0f

Browse files
feat: support multiplexed session for blind write with single use transaction (#3229)
* feat(spanner): support multiplexed session for blind write with single use transaction. * test(spanner): added test for the support of multiplexed session for blind writes (writeAtLeastOnce) * chore(spanner): lint * fix(spanner): updated the adoption for blind write into GOOGLE_CLOUD_SPANNER_MULTIPLEXED_SESSIONS. * chore: simplify code to make it easier to reuse for later additions * feat(spanner): added flag to control use of multiplexed session for blind write. This flag will be used by systest. * lint(spanner): javadoc fixes. --------- Co-authored-by: Knut Olav Løite <[email protected]>
1 parent c54abd1 commit b3e2b0f

File tree

10 files changed

+296
-18
lines changed

10 files changed

+296
-18
lines changed

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -803,10 +803,13 @@ private synchronized Spanner getClient(long timeoutSeconds, boolean useMultiplex
803803
.setTotalTimeout(rpcTimeout)
804804
.build();
805805

806-
com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions =
807-
SessionPoolOptionsHelper.setUseMultiplexedSession(
808-
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession)
809-
.build();
806+
com.google.cloud.spanner.SessionPoolOptions.Builder poolOptionsBuilder =
807+
com.google.cloud.spanner.SessionPoolOptions.newBuilder();
808+
SessionPoolOptionsHelper.setUseMultiplexedSession(
809+
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
810+
SessionPoolOptionsHelper.setUseMultiplexedSessionBlindWrite(
811+
com.google.cloud.spanner.SessionPoolOptions.newBuilder(), useMultiplexedSession);
812+
com.google.cloud.spanner.SessionPoolOptions sessionPoolOptions = poolOptionsBuilder.build();
810813
// Cloud Spanner Client does not support global retry settings,
811814
// Thus, we need to add retry settings to each individual stub.
812815
SpannerOptions.Builder optionsBuilder =

google-cloud-spanner-executor/src/main/java/com/google/cloud/spanner/SessionPoolOptionsHelper.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,12 @@ public static SessionPoolOptions.Builder setUseMultiplexedSession(
3030
SessionPoolOptions.Builder sessionPoolOptionsBuilder, boolean useMultiplexedSession) {
3131
return sessionPoolOptionsBuilder.setUseMultiplexedSession(useMultiplexedSession);
3232
}
33+
34+
// TODO: Remove when multiplexed session for blind write is released.
35+
public static SessionPoolOptions.Builder setUseMultiplexedSessionBlindWrite(
36+
SessionPoolOptions.Builder sessionPoolOptionsBuilder,
37+
boolean useMultiplexedSessionBlindWrite) {
38+
return sessionPoolOptionsBuilder.setUseMultiplexedSessionBlindWrite(
39+
useMultiplexedSessionBlindWrite);
40+
}
3341
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,7 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
5353

5454
@Override
5555
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
56-
throw new UnsupportedOperationException();
57-
}
58-
59-
@Override
60-
public CommitResponse writeAtLeastOnceWithOptions(
61-
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
62-
throw new UnsupportedOperationException();
56+
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
6357
}
6458

6559
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,37 @@ class DatabaseClientImpl implements DatabaseClient {
3737
@VisibleForTesting final SessionPool pool;
3838
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
3939

40+
final boolean useMultiplexedSessionBlindWrite;
41+
4042
@VisibleForTesting
4143
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
42-
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
44+
this(
45+
"",
46+
pool,
47+
/* useMultiplexedSessionBlindWrite = */ false,
48+
/* multiplexedSessionDatabaseClient = */ null,
49+
tracer);
4350
}
4451

4552
@VisibleForTesting
4653
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
47-
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
54+
this(
55+
clientId,
56+
pool,
57+
/* useMultiplexedSessionBlindWrite = */ false,
58+
/* multiplexedSessionDatabaseClient = */ null,
59+
tracer);
4860
}
4961

5062
DatabaseClientImpl(
5163
String clientId,
5264
SessionPool pool,
65+
boolean useMultiplexedSessionBlindWrite,
5366
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
5467
TraceWrapper tracer) {
5568
this.clientId = clientId;
5669
this.pool = pool;
70+
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
5771
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
5872
this.tracer = tracer;
5973
}
@@ -65,13 +79,21 @@ PooledSessionFuture getSession() {
6579

6680
@VisibleForTesting
6781
DatabaseClient getMultiplexedSession() {
68-
if (this.multiplexedSessionDatabaseClient != null
69-
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) {
82+
if (canUseMultiplexedSessions()) {
7083
return this.multiplexedSessionDatabaseClient;
7184
}
7285
return pool.getMultiplexedSessionWithFallback();
7386
}
7487

88+
private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() {
89+
return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null;
90+
}
91+
92+
private boolean canUseMultiplexedSessions() {
93+
return this.multiplexedSessionDatabaseClient != null
94+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
95+
}
96+
7597
@Override
7698
public Dialect getDialect() {
7799
return pool.getDialect();
@@ -114,6 +136,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
114136
throws SpannerException {
115137
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
116138
try (IScope s = tracer.withSpan(span)) {
139+
if (useMultiplexedSessionBlindWrite && getMultiplexedSessionDatabaseClient() != null) {
140+
return getMultiplexedSessionDatabaseClient()
141+
.writeAtLeastOnceWithOptions(mutations, options);
142+
}
117143
return runWithSessionRetry(
118144
session -> session.writeAtLeastOnceWithOptions(mutations, options));
119145
} catch (RuntimeException e) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
2424
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
25+
import com.google.cloud.spanner.Options.TransactionOption;
2526
import com.google.common.util.concurrent.MoreExecutors;
27+
import java.util.concurrent.ExecutionException;
2628

2729
/**
2830
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
@@ -119,4 +121,37 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
119121
.readOnlyTransaction(bound),
120122
MoreExecutors.directExecutor()));
121123
}
124+
125+
/**
126+
* This is a blocking method, as the interface that it implements is also defined as a blocking
127+
* method.
128+
*/
129+
@Override
130+
public CommitResponse writeAtLeastOnceWithOptions(
131+
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
132+
SessionReference sessionReference = getSessionReference();
133+
try (MultiplexedSessionTransaction transaction =
134+
new MultiplexedSessionTransaction(client, span, sessionReference, NO_CHANNEL_HINT, true)) {
135+
return transaction.writeAtLeastOnceWithOptions(mutations, options);
136+
}
137+
}
138+
139+
/**
140+
* Gets the session reference that this delayed transaction is waiting for. This method should
141+
* only be called by methods that are allowed to be blocking.
142+
*/
143+
private SessionReference getSessionReference() {
144+
try {
145+
return this.sessionFuture.get();
146+
} catch (ExecutionException executionException) {
147+
// Propagate the underlying exception as a RuntimeException (SpannerException is also a
148+
// RuntimeException).
149+
if (executionException.getCause() instanceof RuntimeException) {
150+
throw (RuntimeException) executionException.getCause();
151+
}
152+
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
153+
} catch (InterruptedException interruptedException) {
154+
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
155+
}
156+
}
122157
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.core.SettableApiFuture;
24+
import com.google.cloud.spanner.Options.TransactionOption;
2425
import com.google.cloud.spanner.SessionClient.SessionConsumer;
2526
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
2627
import com.google.common.annotations.VisibleForTesting;
@@ -107,6 +108,14 @@ void onReadDone() {
107108
}
108109
}
109110

111+
@Override
112+
public CommitResponse writeAtLeastOnceWithOptions(
113+
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
114+
CommitResponse response = super.writeAtLeastOnceWithOptions(mutations, options);
115+
onTransactionDone();
116+
return response;
117+
}
118+
110119
@Override
111120
void onTransactionDone() {
112121
boolean markedDone = false;
@@ -358,6 +367,13 @@ private int getSingleUseChannelHint() {
358367
}
359368
}
360369

370+
@Override
371+
public CommitResponse writeAtLeastOnceWithOptions(
372+
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
373+
return createMultiplexedSessionTransaction(true)
374+
.writeAtLeastOnceWithOptions(mutations, options);
375+
}
376+
361377
@Override
362378
public ReadContext singleUse() {
363379
return createMultiplexedSessionTransaction(true).singleUse();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ public class SessionPoolOptions {
7373

7474
private final boolean useMultiplexedSession;
7575

76+
/**
77+
* Controls whether multiplexed session is enabled for blind write or not. This is only used for
78+
* systest soak. TODO: Remove when multiplexed session for blind write is released.
79+
*/
80+
private final boolean useMultiplexedSessionBlindWrite;
81+
7682
private final boolean useMultiplexedSessionForRW;
7783

7884
// TODO: Change to use java.time.Duration.
@@ -110,6 +116,7 @@ private SessionPoolOptions(Builder builder) {
110116
(useMultiplexedSessionFromEnvVariable != null)
111117
? useMultiplexedSessionFromEnvVariable
112118
: builder.useMultiplexedSession;
119+
this.useMultiplexedSessionBlindWrite = builder.useMultiplexedSessionBlindWrite;
113120
// useMultiplexedSessionForRW priority => Environment var > private setter > client default
114121
Boolean useMultiplexedSessionForRWFromEnvVariable =
115122
getUseMultiplexedSessionForRWFromEnvVariable();
@@ -184,6 +191,7 @@ public int hashCode() {
184191
this.inactiveTransactionRemovalOptions,
185192
this.poolMaintainerClock,
186193
this.useMultiplexedSession,
194+
this.useMultiplexedSessionBlindWrite,
187195
this.useMultiplexedSessionForRW,
188196
this.multiplexedSessionMaintenanceDuration);
189197
}
@@ -318,6 +326,12 @@ public boolean getUseMultiplexedSession() {
318326
return useMultiplexedSession;
319327
}
320328

329+
@VisibleForTesting
330+
@InternalApi
331+
protected boolean getUseMultiplexedSessionBlindWrite() {
332+
return getUseMultiplexedSession() && useMultiplexedSessionBlindWrite;
333+
}
334+
321335
@VisibleForTesting
322336
@InternalApi
323337
public boolean getUseMultiplexedSessionForRW() {
@@ -554,6 +568,9 @@ public static class Builder {
554568
// Set useMultiplexedSession to true to make multiplexed session the default.
555569
private boolean useMultiplexedSession = false;
556570

571+
// TODO: Remove when multiplexed session for blind write is released.
572+
private boolean useMultiplexedSessionBlindWrite = false;
573+
557574
// This field controls the default behavior of session management for RW operations in Java
558575
// client.
559576
// Set useMultiplexedSessionForRW to true to make multiplexed session for RW operations the
@@ -601,6 +618,7 @@ private Builder(SessionPoolOptions options) {
601618
this.randomizePositionQPSThreshold = options.randomizePositionQPSThreshold;
602619
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
603620
this.useMultiplexedSession = options.useMultiplexedSession;
621+
this.useMultiplexedSessionBlindWrite = options.useMultiplexedSessionBlindWrite;
604622
this.useMultiplexedSessionForRW = options.useMultiplexedSessionForRW;
605623
this.multiplexedSessionMaintenanceDuration = options.multiplexedSessionMaintenanceDuration;
606624
this.poolMaintainerClock = options.poolMaintainerClock;
@@ -789,6 +807,17 @@ Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
789807
return this;
790808
}
791809

810+
/**
811+
* This method enables multiplexed sessions for blind writes. This method will be removed in the
812+
* future when multiplexed sessions has been made the default for all operations.
813+
*/
814+
@InternalApi
815+
@VisibleForTesting
816+
Builder setUseMultiplexedSessionBlindWrite(boolean useMultiplexedSessionBlindWrite) {
817+
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
818+
return this;
819+
}
820+
792821
/**
793822
* Sets whether the client should use multiplexed session for R/W operations or not. This method
794823
* is intentionally package-private and intended for internal use.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerImpl.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,11 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
303303
numMultiplexedSessionsReleased);
304304
pool.maybeWaitOnMinSessions();
305305
DatabaseClientImpl dbClient =
306-
createDatabaseClient(clientId, pool, multiplexedSessionDatabaseClient);
306+
createDatabaseClient(
307+
clientId,
308+
pool,
309+
getOptions().getSessionPoolOptions().getUseMultiplexedSessionBlindWrite(),
310+
multiplexedSessionDatabaseClient);
307311
dbClients.put(db, dbClient);
308312
return dbClient;
309313
}
@@ -314,8 +318,10 @@ public DatabaseClient getDatabaseClient(DatabaseId db) {
314318
DatabaseClientImpl createDatabaseClient(
315319
String clientId,
316320
SessionPool pool,
321+
boolean useMultiplexedSessionBlindWrite,
317322
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionClient) {
318-
return new DatabaseClientImpl(clientId, pool, multiplexedSessionClient, tracer);
323+
return new DatabaseClientImpl(
324+
clientId, pool, useMultiplexedSessionBlindWrite, multiplexedSessionClient, tracer);
319325
}
320326

321327
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/IntegrationTestWithClosedSessionsEnv.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ private static class SpannerWithClosedSessionsImpl extends SpannerImpl {
4747

4848
@Override
4949
DatabaseClientImpl createDatabaseClient(
50-
String clientId, SessionPool pool, MultiplexedSessionDatabaseClient ignore) {
50+
String clientId,
51+
SessionPool pool,
52+
boolean useMultiplexedSessionBlindWriteIgnore,
53+
MultiplexedSessionDatabaseClient ignore) {
5154
return new DatabaseClientWithClosedSessionImpl(clientId, pool, tracer);
5255
}
5356
}

0 commit comments

Comments
 (0)