Skip to content

chore: add a random hint for multi-use transactions when they are use… #3058

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-spanner'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.64.0'
implementation 'com.google.cloud:google-cloud-spanner:6.65.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.64.0"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.65.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -650,7 +650,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.64.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.65.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SessionClient.optionMap;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -32,6 +33,7 @@
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.SessionClient.SessionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -52,6 +54,7 @@
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
Expand Down Expand Up @@ -180,9 +183,15 @@ static Builder newBuilder() {
@GuardedBy("lock")
private boolean used;

private final Map<SpannerRpc.Option, ?> channelHint;

private SingleReadContext(Builder builder) {
super(builder);
this.bound = builder.bound;
// single use transaction have a single RPC and hence there is no need
// of a channel hint. GAX will automatically choose a hint when used
// with a multiplexed session.
this.channelHint = getChannelHintOptions(session.getOptions(), null);
}

@Override
Expand All @@ -209,6 +218,11 @@ TransactionSelector getTransactionSelector() {
.setSingleUse(TransactionOptions.newBuilder().setReadOnly(bound.toProto()))
.build();
}

@Override
Map<SpannerRpc.Option, ?> getTransactionChannelHint() {
return channelHint;
}
}

private static void assertTimestampAvailable(boolean available) {
Expand All @@ -217,6 +231,7 @@ private static void assertTimestampAvailable(boolean available) {

static class SingleUseReadOnlyTransaction extends SingleReadContext
implements ReadOnlyTransaction {

@GuardedBy("lock")
private Timestamp timestamp;

Expand Down Expand Up @@ -300,6 +315,8 @@ static Builder newBuilder() {
@GuardedBy("txnLock")
private ByteString transactionId;

private final Map<SpannerRpc.Option, ?> channelHint;

MultiUseReadOnlyTransaction(Builder builder) {
super(builder);
checkArgument(
Expand All @@ -318,6 +335,14 @@ static Builder newBuilder() {
this.timestamp = builder.timestamp;
this.transactionId = builder.transactionId;
}
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
}

@Override
public Map<SpannerRpc.Option, ?> getTransactionChannelHint() {
return channelHint;
}

@Override
Expand Down Expand Up @@ -380,7 +405,7 @@ void initTransaction() {
.setOptions(options)
.build();
Transaction transaction =
rpc.beginTransaction(request, session.getOptions(), isRouteToLeader());
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
Expand Down Expand Up @@ -727,7 +752,10 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
request.build(),
stream.consumer(),
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, request.getTransaction().hasBegin());
Expand All @@ -738,6 +766,16 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
}

Map<SpannerRpc.Option, ?> getChannelHintOptions(
Map<SpannerRpc.Option, ?> channelHintForSession, Long channelHintForTransaction) {
if (channelHintForSession != null) {
return channelHintForSession;
} else if (channelHintForTransaction != null) {
return optionMap(SessionOption.channelHint(channelHintForTransaction));
}
return null;
}

/**
* Called before any read or query is started to perform state checks and initializations.
* Subclasses should call {@code super.beforeReadOrQuery()} if overriding.
Expand Down Expand Up @@ -782,6 +820,12 @@ public void close() {
@Nullable
abstract TransactionSelector getTransactionSelector();

/**
* Channel hint to be used for a transaction. This enables soft-stickiness per transaction by
* ensuring all RPCs within a transaction land up on the same channel.
*/
abstract Map<SpannerRpc.Option, ?> getTransactionChannelHint();

/**
* Returns the transaction tag for this {@link AbstractReadContext} or <code>null</code> if this
* {@link AbstractReadContext} does not have a transaction tag.
Expand Down Expand Up @@ -872,7 +916,10 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
builder.setRequestOptions(buildRequestOptions(readOptions));
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
isRouteToLeader());
session.markUsed(clock.instant());
call.request(prefetchChunks);
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -51,11 +53,13 @@
import com.google.spanner.v1.TransactionSelector;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -198,13 +202,18 @@ public void removeListener(Runnable listener) {
private CommitResponse commitResponse;
private final Clock clock;

private final Map<SpannerRpc.Option, ?> channelHint;

private TransactionContextImpl(Builder builder) {
super(builder);
this.transactionId = builder.transactionId;
this.trackTransactionStarter = builder.trackTransactionStarter;
this.options = builder.options;
this.finishedAsyncOperations.set(null);
this.clock = builder.clock;
this.channelHint =
getChannelHintOptions(
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
}

@Override
Expand Down Expand Up @@ -559,6 +568,11 @@ TransactionSelector getTransactionSelector() {
return TransactionSelector.newBuilder().setId(transactionId).build();
}

@Override
Map<Option, ?> getTransactionChannelHint() {
return channelHint;
}

@Override
public void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) {
Preconditions.checkNotNull(transaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas;
import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection;
Expand All @@ -39,6 +40,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -98,6 +100,11 @@ private final class TestReadContext extends AbstractReadContext {
TransactionSelector getTransactionSelector() {
return TransactionSelector.getDefaultInstance();
}

@Override
Map<SpannerRpc.Option, ?> getTransactionChannelHint() {
return null;
}
}

private final class TestReadContextWithTag extends AbstractReadContext {
Expand All @@ -110,6 +117,11 @@ TransactionSelector getTransactionSelector() {
return TransactionSelector.getDefaultInstance();
}

@Override
Map<Option, ?> getTransactionChannelHint() {
return null;
}

String getTransactionTag() {
return "app=spanner,env=test";
}
Expand Down
Loading