Skip to content

refactor: move multiplexed session handling to separate class #3063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 52 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
e2dde6f
chore: try with random channel hint
olavloite Apr 24, 2024
e4b585f
chore: add option for random channel
olavloite Apr 24, 2024
0cf711d
chore: actually use random channel option
olavloite Apr 24, 2024
fa1c7a1
chore: only lock the specific wrapper
olavloite Apr 24, 2024
66763e5
chore: simplify creation and assignment
olavloite Apr 24, 2024
78fe4f7
chore: make more variables final
olavloite Apr 24, 2024
1ea4c28
Merge branch 'main' into mux-benchmark-experiments
olavloite Apr 24, 2024
3eab7fa
chore: use separate pool
olavloite Apr 24, 2024
571f8a6
chore: use a separate mux client
olavloite Apr 24, 2024
ef9939b
chore: make init blocking
olavloite Apr 25, 2024
7afe362
chore: disable pending tx check
olavloite Apr 25, 2024
3e7a6e4
chore: add call durations to client lib
olavloite Apr 25, 2024
648fe58
chore: add call_durations
olavloite Apr 25, 2024
0359280
chore: use session pool for mux session
olavloite Apr 25, 2024
14f9fc9
chore: use mux database client
olavloite Apr 25, 2024
7947517
chore: make mux client optional
olavloite Apr 25, 2024
b17215d
refactor: move multiplexed session handling to separate class
olavloite Apr 25, 2024
5cb343b
chore: cleanup
olavloite Apr 25, 2024
f7394d1
feat: add maintainer
olavloite Apr 25, 2024
6260221
chore: add more tests
olavloite Apr 26, 2024
92795df
chore: fix test failures
olavloite Apr 26, 2024
9162a2b
fix: ChannelUsageTest should keep session in use for longer
olavloite Apr 26, 2024
b71ef15
test: skip ChannelUsageTest in all cases
olavloite Apr 26, 2024
e027e2e
chore: keep track of DatabaseDeleted errors
olavloite Apr 26, 2024
8258ce7
fix: freeze server to prevent flakiness
olavloite Apr 26, 2024
9bbb402
fix: freeze server to prevent flakiness
olavloite Apr 26, 2024
ae426fa
test: add retry loop for second query
olavloite Apr 26, 2024
4639c3b
chore: remove print statements and add emulator handling in test
olavloite Apr 28, 2024
306a063
test: add tests for maintainer
olavloite Apr 28, 2024
f5323fb
chore: register SessionConsumer for reflection
olavloite Apr 28, 2024
e5079f3
chore: skip test as it fails on native build
olavloite Apr 28, 2024
9e3a3f5
Merge branch 'main' into mux-benchmark-experiments
olavloite Apr 30, 2024
fe27c6a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 30, 2024
16b3578
chore: add random channel hint as option
olavloite Apr 30, 2024
7a042fc
Merge branch 'mux-benchmark-experiments' of github.com:googleapis/jav…
olavloite Apr 30, 2024
00088a5
chore: add single-use channel hint
olavloite Apr 30, 2024
2ea7c0a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 30, 2024
b26ed58
chore: single-use hint
olavloite Apr 30, 2024
f3b1519
Merge branch 'mux-benchmark-experiments' of github.com:googleapis/jav…
olavloite Apr 30, 2024
c39148e
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Apr 30, 2024
45b2c0d
chore: use next available channel
olavloite Apr 30, 2024
3cacd70
Merge branch 'mux-benchmark-experiments' of github.com:googleapis/jav…
olavloite Apr 30, 2024
9cac61a
chore: keep track of num transactions and channels in use
olavloite May 1, 2024
5bc5040
chore: remove println
olavloite May 1, 2024
807120d
chore: remove option for using session pool for mux
olavloite May 1, 2024
55daa50
feat: add UNIMPLEMENTED handler
olavloite May 1, 2024
5979f2d
chore: cleanup
olavloite May 1, 2024
9d6845c
chore: add TODO for removing the unimplemented handling
olavloite May 1, 2024
00c9a44
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] May 1, 2024
b3aee68
chore: run formatter
olavloite May 1, 2024
d18b834
Merge branch 'mux-benchmark-experiments' of github.com:googleapis/jav…
olavloite May 1, 2024
794cbd8
test: fix flaky tests
olavloite May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,20 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.37.0')
implementation platform('com.google.cloud:libraries-bom:26.38.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.65.0'
implementation 'com.google.cloud:google-cloud-spanner:6.65.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.65.0"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.65.1"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -651,7 +651,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.65.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.65.1
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;

/**
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
* UnsupportedOperationException} for all methods that are currently not supported for multiplexed
* sessions. The concrete implementation implements the methods that are supported with multiplexed
* sessions.
*/
abstract class AbstractMultiplexedSessionDatabaseClient implements DatabaseClient {

@Override
public Dialect getDialect() {
throw new UnsupportedOperationException();
}

@Override
public String getDatabaseRole() {
throw new UnsupportedOperationException();
}

@Override
public Timestamp write(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption... options)
throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public CommitResponse writeAtLeastOnceWithOptions(
Iterable<Mutation> mutations, TransactionOption... options) throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
throws SpannerException {
throw new UnsupportedOperationException();
}

@Override
public TransactionRunner readWriteTransaction(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public TransactionManager transactionManager(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public AsyncRunner runAsync(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public AsyncTransactionManager transactionManagerAsync(TransactionOption... options) {
throw new UnsupportedOperationException();
}

@Override
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -844,11 +844,14 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude

@Override
public SpannerException onError(SpannerException e, boolean withBeginTransaction) {
this.session.onError(e);
return e;
}

@Override
public void onDone(boolean withBeginTransaction) {}
public void onDone(boolean withBeginTransaction) {
this.session.onReadDone();
}

private ResultSet readInternal(
String table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionPool.PooledSessionFuture;
import com.google.cloud.spanner.SessionPool.SessionFutureWrapper;
import com.google.cloud.spanner.SpannerImpl.ClosedException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
Expand All @@ -36,15 +35,26 @@ class DatabaseClientImpl implements DatabaseClient {
private final TraceWrapper tracer;
@VisibleForTesting final String clientId;
@VisibleForTesting final SessionPool pool;
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;

@VisibleForTesting
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
this("", pool, tracer);
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
}

@VisibleForTesting
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
}

DatabaseClientImpl(
String clientId,
SessionPool pool,
@Nullable MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient,
TraceWrapper tracer) {
this.clientId = clientId;
this.pool = pool;
this.multiplexedSessionDatabaseClient = multiplexedSessionDatabaseClient;
this.tracer = tracer;
}

Expand All @@ -54,7 +64,11 @@ PooledSessionFuture getSession() {
}

@VisibleForTesting
SessionFutureWrapper getMultiplexedSession() {
DatabaseClient getMultiplexedSession() {
if (this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported()) {
return this.multiplexedSessionDatabaseClient;
}
return pool.getMultiplexedSessionWithFallback();
}

Expand Down Expand Up @@ -270,7 +284,18 @@ private <T> T runWithSessionRetry(Function<Session, T> callable) {
}
}

boolean isValid() {
return pool.isValid()
&& (multiplexedSessionDatabaseClient == null
|| multiplexedSessionDatabaseClient.isValid()
|| !multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported());
}

ListenableFuture<Void> closeAsync(ClosedException closedException) {
if (this.multiplexedSessionDatabaseClient != null) {
// This method is non-blocking.
this.multiplexedSessionDatabaseClient.close();
}
return pool.closeAsync(closedException);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
import com.google.common.util.concurrent.MoreExecutors;

/**
* Represents a delayed execution of a transaction on a multiplexed session. The execution is
* delayed because the multiplexed session is not yet ready. This class is only used during client
* creation before the multiplexed session has been created. The use of this class while the
* multiplexed session is still being created ensures that the creation of a {@link DatabaseClient}
* is non-blocking.
*/
class DelayedMultiplexedSessionTransaction extends AbstractMultiplexedSessionDatabaseClient {

private final MultiplexedSessionDatabaseClient client;

private final ISpan span;

private final ApiFuture<SessionReference> sessionFuture;

DelayedMultiplexedSessionTransaction(
MultiplexedSessionDatabaseClient client,
ISpan span,
ApiFuture<SessionReference> sessionFuture) {
this.client = client;
this.span = span;
this.sessionFuture = sessionFuture;
}

@Override
public ReadContext singleUse() {
return new DelayedReadContext<>(
ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, true)
.singleUse(),
MoreExecutors.directExecutor()));
}

@Override
public ReadContext singleUse(TimestampBound bound) {
return new DelayedReadContext<>(
ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, true)
.singleUse(bound),
MoreExecutors.directExecutor()));
}

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction() {
return new DelayedReadOnlyTransaction(
ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, true)
.singleUseReadOnlyTransaction(),
MoreExecutors.directExecutor()));
}

@Override
public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
return new DelayedReadOnlyTransaction(
ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, true)
.singleUseReadOnlyTransaction(bound),
MoreExecutors.directExecutor()));
}

@Override
public ReadOnlyTransaction readOnlyTransaction() {
return new DelayedReadOnlyTransaction(
ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, false)
.readOnlyTransaction(),
MoreExecutors.directExecutor()));
}

@Override
public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
return new DelayedReadOnlyTransaction(
ApiFutures.transform(
this.sessionFuture,
sessionReference ->
new MultiplexedSessionTransaction(
client, span, sessionReference, NO_CHANNEL_HINT, false)
.readOnlyTransaction(bound),
MoreExecutors.directExecutor()));
}
}
Loading