Skip to content

Commit b2795a7

Browse files
refactor: move multiplexed session handling to separate class (#3063)
* chore: try with random channel hint * chore: add option for random channel * chore: actually use random channel option * chore: only lock the specific wrapper * chore: simplify creation and assignment * chore: make more variables final * chore: use separate pool * chore: use a separate mux client * chore: make init blocking * chore: disable pending tx check * chore: add call durations to client lib * chore: add call_durations * chore: use session pool for mux session * chore: use mux database client * chore: make mux client optional * refactor: move multiplexed session handling to separate class * chore: cleanup * feat: add maintainer * chore: add more tests * chore: fix test failures * fix: ChannelUsageTest should keep session in use for longer * test: skip ChannelUsageTest in all cases * chore: keep track of DatabaseDeleted errors * fix: freeze server to prevent flakiness * fix: freeze server to prevent flakiness * test: add retry loop for second query * chore: remove print statements and add emulator handling in test * test: add tests for maintainer * chore: register SessionConsumer for reflection * chore: skip test as it fails on native build Just skip the entire test, as the scenario is already covered by other tests, and spending time on trying to figure out what is happening in this failure is just a waste of time. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore: add random channel hint as option * chore: add single-use channel hint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore: single-use hint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore: use next available channel * chore: keep track of num transactions and channels in use * chore: remove println * chore: remove option for using session pool for mux * feat: add UNIMPLEMENTED handler * chore: cleanup * chore: add TODO for removing the unimplemented handling * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore: run formatter * test: fix flaky tests Fixes #3050 Fixes #3081 Fixes #3080 --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 0894893 commit b2795a7

30 files changed

+1727
-162
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.gax.rpc.ServerStream;
20+
import com.google.cloud.Timestamp;
21+
import com.google.cloud.spanner.Options.TransactionOption;
22+
import com.google.cloud.spanner.Options.UpdateOption;
23+
import com.google.spanner.v1.BatchWriteResponse;
24+
25+
/**
26+
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
27+
* UnsupportedOperationException} for all methods that are currently not supported for multiplexed
28+
* sessions. The concrete implementation implements the methods that are supported with multiplexed
29+
* sessions.
30+
*/
31+
abstract class AbstractMultiplexedSessionDatabaseClient implements DatabaseClient {
32+
33+
@Override
34+
public Dialect getDialect() {
35+
throw new UnsupportedOperationException();
36+
}
37+
38+
@Override
39+
public String getDatabaseRole() {
40+
throw new UnsupportedOperationException();
41+
}
42+
43+
@Override
44+
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
45+
throw new UnsupportedOperationException();
46+
}
47+
48+
@Override
49+
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
50+
throws SpannerException {
51+
throw new UnsupportedOperationException();
52+
}
53+
54+
@Override
55+
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
56+
throw new UnsupportedOperationException();
57+
}
58+
59+
@Override
60+
public CommitResponse writeAtLeastOnceWithOptions(
61+
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
62+
throw new UnsupportedOperationException();
63+
}
64+
65+
@Override
66+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
67+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
68+
throws SpannerException {
69+
throw new UnsupportedOperationException();
70+
}
71+
72+
@Override
73+
public TransactionRunner readWriteTransaction(TransactionOption... options) {
74+
throw new UnsupportedOperationException();
75+
}
76+
77+
@Override
78+
public TransactionManager transactionManager(TransactionOption... options) {
79+
throw new UnsupportedOperationException();
80+
}
81+
82+
@Override
83+
public AsyncRunner runAsync(TransactionOption... options) {
84+
throw new UnsupportedOperationException();
85+
}
86+
87+
@Override
88+
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
89+
throw new UnsupportedOperationException();
90+
}
91+
92+
@Override
93+
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
94+
throw new UnsupportedOperationException();
95+
}
96+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -844,11 +844,14 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude
844844

845845
@Override
846846
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
847+
this.session.onError(e);
847848
return e;
848849
}
849850

850851
@Override
851-
public void onDone(boolean withBeginTransaction) {}
852+
public void onDone(boolean withBeginTransaction) {
853+
this.session.onReadDone();
854+
}
852855

853856
private ResultSet readInternal(
854857
String table,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.spanner.Options.TransactionOption;
2222
import com.google.cloud.spanner.Options.UpdateOption;
2323
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
24-
import com.google.cloud.spanner.SessionPool.SessionFutureWrapper;
2524
import com.google.cloud.spanner.SpannerImpl.ClosedException;
2625
import com.google.common.annotations.VisibleForTesting;
2726
import com.google.common.base.Function;
@@ -36,15 +35,26 @@ class DatabaseClientImpl implements DatabaseClient {
3635
private final TraceWrapper tracer;
3736
@VisibleForTesting final String clientId;
3837
@VisibleForTesting final SessionPool pool;
38+
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
3939

4040
@VisibleForTesting
4141
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
42-
this("", pool, tracer);
42+
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
4343
}
4444

45+
@VisibleForTesting
4546
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
47+
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
48+
}
49+
50+
DatabaseClientImpl(
51+
String clientId,
52+
SessionPool pool,
53+
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
54+
TraceWrapper tracer) {
4655
this.clientId = clientId;
4756
this.pool = pool;
57+
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
4858
this.tracer = tracer;
4959
}
5060

@@ -54,7 +64,11 @@ PooledSessionFuture getSession() {
5464
}
5565

5666
@VisibleForTesting
57-
SessionFutureWrapper getMultiplexedSession() {
67+
DatabaseClient getMultiplexedSession() {
68+
if (this.multiplexedSessionDatabaseClient != null
69+
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) {
70+
return this.multiplexedSessionDatabaseClient;
71+
}
5872
return pool.getMultiplexedSessionWithFallback();
5973
}
6074

@@ -270,7 +284,18 @@ private <T> T runWithSessionRetry(Function<Session, T> callable) {
270284
}
271285
}
272286

287+
boolean isValid() {
288+
return pool.isValid()
289+
&& (multiplexedSessionDatabaseClient == null
290+
|| multiplexedSessionDatabaseClient.isValid()
291+
|| !multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported());
292+
}
293+
273294
ListenableFuture<Void> closeAsync(ClosedException closedException) {
295+
if (this.multiplexedSessionDatabaseClient != null) {
296+
// This method is non-blocking.
297+
this.multiplexedSessionDatabaseClient.close();
298+
}
274299
return pool.closeAsync(closedException);
275300
}
276301
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;
20+
21+
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
23+
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
24+
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
25+
import com.google.common.util.concurrent.MoreExecutors;
26+
27+
/**
28+
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
29+
* delayed because the multiplexed session is not yet ready. This class is only used during client
30+
* creation before the multiplexed session has been created. The use of this class while the
31+
* multiplexed session is still being created ensures that the creation of a {@link DatabaseClient}
32+
* is non-blocking.
33+
*/
34+
class DelayedMultiplexedSessionTransaction extends AbstractMultiplexedSessionDatabaseClient {
35+
36+
private final MultiplexedSessionDatabaseClient client;
37+
38+
private final ISpan span;
39+
40+
private final ApiFuture<SessionReference> sessionFuture;
41+
42+
DelayedMultiplexedSessionTransaction(
43+
MultiplexedSessionDatabaseClient client,
44+
ISpan span,
45+
ApiFuture<SessionReference> sessionFuture) {
46+
this.client = client;
47+
this.span = span;
48+
this.sessionFuture = sessionFuture;
49+
}
50+
51+
@Override
52+
public ReadContext singleUse() {
53+
return new DelayedReadContext<>(
54+
ApiFutures.transform(
55+
this.sessionFuture,
56+
sessionReference ->
57+
new MultiplexedSessionTransaction(
58+
client, span, sessionReference, NO_CHANNEL_HINT, true)
59+
.singleUse(),
60+
MoreExecutors.directExecutor()));
61+
}
62+
63+
@Override
64+
public ReadContext singleUse(TimestampBound bound) {
65+
return new DelayedReadContext<>(
66+
ApiFutures.transform(
67+
this.sessionFuture,
68+
sessionReference ->
69+
new MultiplexedSessionTransaction(
70+
client, span, sessionReference, NO_CHANNEL_HINT, true)
71+
.singleUse(bound),
72+
MoreExecutors.directExecutor()));
73+
}
74+
75+
@Override
76+
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
77+
return new DelayedReadOnlyTransaction(
78+
ApiFutures.transform(
79+
this.sessionFuture,
80+
sessionReference ->
81+
new MultiplexedSessionTransaction(
82+
client, span, sessionReference, NO_CHANNEL_HINT, true)
83+
.singleUseReadOnlyTransaction(),
84+
MoreExecutors.directExecutor()));
85+
}
86+
87+
@Override
88+
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
89+
return new DelayedReadOnlyTransaction(
90+
ApiFutures.transform(
91+
this.sessionFuture,
92+
sessionReference ->
93+
new MultiplexedSessionTransaction(
94+
client, span, sessionReference, NO_CHANNEL_HINT, true)
95+
.singleUseReadOnlyTransaction(bound),
96+
MoreExecutors.directExecutor()));
97+
}
98+
99+
@Override
100+
public ReadOnlyTransaction readOnlyTransaction() {
101+
return new DelayedReadOnlyTransaction(
102+
ApiFutures.transform(
103+
this.sessionFuture,
104+
sessionReference ->
105+
new MultiplexedSessionTransaction(
106+
client, span, sessionReference, NO_CHANNEL_HINT, false)
107+
.readOnlyTransaction(),
108+
MoreExecutors.directExecutor()));
109+
}
110+
111+
@Override
112+
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
113+
return new DelayedReadOnlyTransaction(
114+
ApiFutures.transform(
115+
this.sessionFuture,
116+
sessionReference ->
117+
new MultiplexedSessionTransaction(
118+
client, span, sessionReference, NO_CHANNEL_HINT, false)
119+
.readOnlyTransaction(bound),
120+
MoreExecutors.directExecutor()));
121+
}
122+
}

0 commit comments

Comments
 (0)