Skip to content

Commit 1f53d38

Browse files
authored
chore(spanner): add R/W multiplexed session support (#3381)
This PR introduces support for using multiplexed sessions in R/W transactions for the following methods: 1. write 2. writeWithOptions 3. readWriteTransaction 4. transactionManager 5. runAsync 6. transactionManagerAsync
1 parent 4f73bdb commit 1f53d38

17 files changed

+768
-68
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,6 @@ public String getDatabaseRole() {
4040
throw new UnsupportedOperationException();
4141
}
4242

43-
@Override
44-
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
45-
throw new UnsupportedOperationException();
46-
}
47-
48-
@Override
49-
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
50-
throws SpannerException {
51-
throw new UnsupportedOperationException();
52-
}
53-
5443
@Override
5544
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
5645
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
@@ -63,26 +52,6 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
6352
throw new UnsupportedOperationException();
6453
}
6554

66-
@Override
67-
public TransactionRunner readWriteTransaction(TransactionOption... options) {
68-
throw new UnsupportedOperationException();
69-
}
70-
71-
@Override
72-
public TransactionManager transactionManager(TransactionOption... options) {
73-
throw new UnsupportedOperationException();
74-
}
75-
76-
@Override
77-
public AsyncRunner runAsync(TransactionOption... options) {
78-
throw new UnsupportedOperationException();
79-
}
80-
81-
@Override
82-
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
83-
throw new UnsupportedOperationException();
84-
}
85-
8655
@Override
8756
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
8857
throw new UnsupportedOperationException();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ public ApiFuture<Void> closeAsync() {
6666
if (txn != null) {
6767
txn.close();
6868
}
69+
if (session != null) {
70+
session.onTransactionDone();
71+
}
6972
return MoreObjects.firstNonNull(res, ApiFutures.immediateFuture(null));
7073
}
7174

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ class DatabaseClientImpl implements DatabaseClient {
3636
@VisibleForTesting final String clientId;
3737
@VisibleForTesting final SessionPool pool;
3838
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
39+
@VisibleForTesting final boolean useMultiplexedSessionForRW;
3940

4041
final boolean useMultiplexedSessionBlindWrite;
4142

@@ -46,7 +47,8 @@ class DatabaseClientImpl implements DatabaseClient {
4647
pool,
4748
/* useMultiplexedSessionBlindWrite = */ false,
4849
/* multiplexedSessionDatabaseClient = */ null,
49-
tracer);
50+
tracer,
51+
/* useMultiplexedSessionForRW = */ false);
5052
}
5153

5254
@VisibleForTesting
@@ -56,20 +58,23 @@ class DatabaseClientImpl implements DatabaseClient {
5658
pool,
5759
/* useMultiplexedSessionBlindWrite = */ false,
5860
/* multiplexedSessionDatabaseClient = */ null,
59-
tracer);
61+
tracer,
62+
false);
6063
}
6164

6265
DatabaseClientImpl(
6366
String clientId,
6467
SessionPool pool,
6568
boolean useMultiplexedSessionBlindWrite,
6669
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
67-
TraceWrapper tracer) {
70+
TraceWrapper tracer,
71+
boolean useMultiplexedSessionForRW) {
6872
this.clientId = clientId;
6973
this.pool = pool;
7074
this.useMultiplexedSessionBlindWrite = useMultiplexedSessionBlindWrite;
7175
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
7276
this.tracer = tracer;
77+
this.useMultiplexedSessionForRW = useMultiplexedSessionForRW;
7378
}
7479

7580
@VisibleForTesting
@@ -85,6 +90,14 @@ DatabaseClient getMultiplexedSession() {
8590
return pool.getMultiplexedSessionWithFallback();
8691
}
8792

93+
@VisibleForTesting
94+
DatabaseClient getMultiplexedSessionForRW() {
95+
if (this.useMultiplexedSessionForRW) {
96+
return getMultiplexedSession();
97+
}
98+
return getSession();
99+
}
100+
88101
private MultiplexedSessionDatabaseClient getMultiplexedSessionDatabaseClient() {
89102
return canUseMultiplexedSessions() ? this.multiplexedSessionDatabaseClient : null;
90103
}
@@ -116,6 +129,9 @@ public CommitResponse writeWithOptions(
116129
throws SpannerException {
117130
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
118131
try (IScope s = tracer.withSpan(span)) {
132+
if (this.useMultiplexedSessionForRW && getMultiplexedSessionDatabaseClient() != null) {
133+
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
134+
}
119135
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
120136
} catch (RuntimeException e) {
121137
span.setStatus(e);
@@ -241,7 +257,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
241257
public TransactionRunner readWriteTransaction(TransactionOption... options) {
242258
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
243259
try (IScope s = tracer.withSpan(span)) {
244-
return getSession().readWriteTransaction(options);
260+
return getMultiplexedSessionForRW().readWriteTransaction(options);
245261
} catch (RuntimeException e) {
246262
span.setStatus(e);
247263
span.end();
@@ -253,7 +269,7 @@ public TransactionRunner readWriteTransaction(TransactionOption... options) {
253269
public TransactionManager transactionManager(TransactionOption... options) {
254270
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
255271
try (IScope s = tracer.withSpan(span)) {
256-
return getSession().transactionManager(options);
272+
return getMultiplexedSessionForRW().transactionManager(options);
257273
} catch (RuntimeException e) {
258274
span.setStatus(e);
259275
span.end();
@@ -265,7 +281,7 @@ public TransactionManager transactionManager(TransactionOption... options) {
265281
public AsyncRunner runAsync(TransactionOption... options) {
266282
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
267283
try (IScope s = tracer.withSpan(span)) {
268-
return getSession().runAsync(options);
284+
return getMultiplexedSessionForRW().runAsync(options);
269285
} catch (RuntimeException e) {
270286
span.setStatus(e);
271287
span.end();
@@ -277,7 +293,7 @@ public AsyncRunner runAsync(TransactionOption... options) {
277293
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
278294
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
279295
try (IScope s = tracer.withSpan(span)) {
280-
return getSession().transactionManagerAsync(options);
296+
return getMultiplexedSessionForRW().transactionManagerAsync(options);
281297
} catch (RuntimeException e) {
282298
span.setStatus(e);
283299
span.end();
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutures;
21+
import com.google.cloud.Timestamp;
22+
import com.google.common.util.concurrent.MoreExecutors;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.Executor;
25+
26+
/**
27+
* Represents a {@link AsyncRunner} using a multiplexed session that is not yet ready. The execution
28+
* will be delayed until the multiplexed session has been created and is ready. This class is only
29+
* used during the startup of the client and the multiplexed session has not yet been created.
30+
*/
31+
public class DelayedAsyncRunner implements AsyncRunner {
32+
33+
private final ApiFuture<AsyncRunner> asyncRunnerFuture;
34+
35+
public DelayedAsyncRunner(ApiFuture<AsyncRunner> asyncRunnerFuture) {
36+
this.asyncRunnerFuture = asyncRunnerFuture;
37+
}
38+
39+
ApiFuture<AsyncRunner> getAsyncRunner() {
40+
return ApiFutures.catchingAsync(
41+
asyncRunnerFuture,
42+
Exception.class,
43+
exception -> {
44+
if (exception instanceof InterruptedException) {
45+
throw SpannerExceptionFactory.propagateInterrupt((InterruptedException) exception);
46+
}
47+
if (exception instanceof ExecutionException) {
48+
throw SpannerExceptionFactory.causeAsRunTimeException((ExecutionException) exception);
49+
}
50+
throw exception;
51+
},
52+
MoreExecutors.directExecutor());
53+
}
54+
55+
@Override
56+
public <R> ApiFuture<R> runAsync(AsyncWork<R> work, Executor executor) {
57+
return ApiFutures.transformAsync(
58+
getAsyncRunner(),
59+
asyncRunner -> asyncRunner.runAsync(work, executor),
60+
MoreExecutors.directExecutor());
61+
}
62+
63+
@Override
64+
public ApiFuture<Timestamp> getCommitTimestamp() {
65+
return ApiFutures.transformAsync(
66+
getAsyncRunner(), AsyncRunner::getCommitTimestamp, MoreExecutors.directExecutor());
67+
}
68+
69+
@Override
70+
public ApiFuture<CommitResponse> getCommitResponse() {
71+
return ApiFutures.transformAsync(
72+
getAsyncRunner(), AsyncRunner::getCommitResponse, MoreExecutors.directExecutor());
73+
}
74+
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.cloud.spanner.TransactionManager.TransactionState;
21+
import java.util.concurrent.ExecutionException;
22+
23+
/**
24+
* Represents a {@link AsyncTransactionManager} using a multiplexed session that is not yet ready.
25+
* The execution will be delayed until the multiplexed session has been created and is ready. This
26+
* class is only used during the startup of the client and the multiplexed session has not yet been
27+
* created.
28+
*/
29+
public class DelayedAsyncTransactionManager implements AsyncTransactionManager {
30+
31+
private final ApiFuture<AsyncTransactionManager> asyncTransactionManagerApiFuture;
32+
33+
DelayedAsyncTransactionManager(
34+
ApiFuture<AsyncTransactionManager> asyncTransactionManagerApiFuture) {
35+
this.asyncTransactionManagerApiFuture = asyncTransactionManagerApiFuture;
36+
}
37+
38+
AsyncTransactionManager getAsyncTransactionManager() {
39+
try {
40+
return this.asyncTransactionManagerApiFuture.get();
41+
} catch (ExecutionException executionException) {
42+
throw SpannerExceptionFactory.causeAsRunTimeException(executionException);
43+
} catch (InterruptedException interruptedException) {
44+
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
45+
}
46+
}
47+
48+
@Override
49+
public TransactionContextFuture beginAsync() {
50+
return getAsyncTransactionManager().beginAsync();
51+
}
52+
53+
@Override
54+
public ApiFuture<Void> rollbackAsync() {
55+
return getAsyncTransactionManager().rollbackAsync();
56+
}
57+
58+
@Override
59+
public TransactionContextFuture resetForRetryAsync() {
60+
return getAsyncTransactionManager().resetForRetryAsync();
61+
}
62+
63+
@Override
64+
public TransactionState getState() {
65+
return getAsyncTransactionManager().getState();
66+
}
67+
68+
@Override
69+
public ApiFuture<CommitResponse> getCommitResponse() {
70+
return getAsyncTransactionManager().getCommitResponse();
71+
}
72+
73+
@Override
74+
public void close() {
75+
getAsyncTransactionManager().close();
76+
}
77+
78+
@Override
79+
public ApiFuture<Void> closeAsync() {
80+
return getAsyncTransactionManager().closeAsync();
81+
}
82+
}

0 commit comments

Comments
 (0)