Skip to content

feat: support isolation level REPEATABLE_READ for R/W transactions #3670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 17, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ private ResultSet executeQueryInternal(
* <li>Specific {@link QueryOptions} passed in for this query.
* <li>Any value specified in a valid environment variable when the {@link SpannerOptions}
* instance was created.
* <li>The default {@link SpannerOptions#getDefaultQueryOptions()} specified for the database
* where the query is executed.
* <li>The default {@link SpannerOptions#getDefaultQueryOptions(DatabaseId)} ()} specified for
* the database where the query is executed.
* </ol>
*/
@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;

/**
* Interface for all the APIs that are used to read/write data into a Cloud Spanner database. An
Expand Down Expand Up @@ -414,6 +415,7 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction
* </ul>
*/
TransactionRunner readWriteTransaction(TransactionOption... options);
Expand Down Expand Up @@ -454,6 +456,7 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction
* </ul>
*/
TransactionManager transactionManager(TransactionOption... options);
Expand Down Expand Up @@ -494,6 +497,7 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction
* </ul>
*/
AsyncRunner runAsync(TransactionOption... options);
Expand Down Expand Up @@ -548,6 +552,7 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
* applied to any other requests on the transaction.
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
* {@link CommitResponse}.
* <li>{@link Options#isolationLevel(IsolationLevel)}: The isolation level for the transaction
* </ul>
*/
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.spanner.v1.ReadRequest.LockHint;
import com.google.spanner.v1.ReadRequest.OrderBy;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -159,6 +160,13 @@ public static TransactionOption optimisticLock() {
return OPTIMISTIC_LOCK_OPTION;
}

/**
* Specifying this instructs the transaction to request {@link IsolationLevel} from the backend.
*/
public static TransactionOption isolationLevel(IsolationLevel isolationLevel) {
return new IsolationLevelOption(isolationLevel);
}

/**
* Specifying this instructs the transaction to be excluded from being recorded in change streams
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
Expand Down Expand Up @@ -490,6 +498,20 @@ void appendToOptions(Options options) {
}
}

/** Option to set isolation level for read/write transactions. */
static final class IsolationLevelOption extends InternalOption implements TransactionOption {
private final IsolationLevel isolationLevel;

public IsolationLevelOption(IsolationLevel isolationLevel) {
this.isolationLevel = isolationLevel;
}

@Override
void appendToOptions(Options options) {
options.isolationLevel = isolationLevel;
}
}

private boolean withCommitStats;

private Duration maxCommitDelay;
Expand All @@ -512,6 +534,7 @@ void appendToOptions(Options options) {
private RpcOrderBy orderBy;
private RpcLockHint lockHint;
private Boolean lastStatement;
private IsolationLevel isolationLevel;

// Construction is via factory methods below.
private Options() {}
Expand Down Expand Up @@ -664,6 +687,10 @@ LockHint lockHint() {
return lockHint == null ? null : lockHint.proto;
}

IsolationLevel isolationLevel() {
return isolationLevel;
}

@Override
public String toString() {
StringBuilder b = new StringBuilder();
Expand Down Expand Up @@ -726,6 +753,9 @@ public String toString() {
if (lockHint != null) {
b.append("lockHint: ").append(lockHint).append(' ');
}
if (isolationLevel != null) {
b.append("isolationLevel: ").append(isolationLevel).append(' ');
}
return b.toString();
}

Expand Down Expand Up @@ -767,7 +797,8 @@ public boolean equals(Object o) {
&& Objects.equals(directedReadOptions(), that.directedReadOptions())
&& Objects.equals(orderBy(), that.orderBy())
&& Objects.equals(isLastStatement(), that.isLastStatement())
&& Objects.equals(lockHint(), that.lockHint());
&& Objects.equals(lockHint(), that.lockHint())
&& Objects.equals(isolationLevel(), that.isolationLevel());
}

@Override
Expand Down Expand Up @@ -833,6 +864,9 @@ public int hashCode() {
if (lockHint != null) {
result = 31 * result + lockHint.hashCode();
}
if (isolationLevel != null) {
result = 31 * result + isolationLevel.hashCode();
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ static TransactionOptions createReadWriteTransactionOptions(
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
}
if (options.isolationLevel() != null) {
transactionOptions.setIsolationLevel(options.isolationLevel());
}
transactionOptions.setReadWrite(readWrite);
return transactionOptions.build();
}
Expand Down Expand Up @@ -193,6 +196,10 @@ void markUsed(Instant instant) {
sessionReference.markUsed(instant);
}

TransactionOptions defaultTransactionOptions() {
return this.spanner.getOptions().getDefaultTransactionOptions();
}

public DatabaseId getDatabaseId() {
return sessionReference.getDatabaseId();
}
Expand Down Expand Up @@ -252,7 +259,11 @@ public CommitResponse writeAtLeastOnceWithOptions(
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
}
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);
if (options.isolationLevel() != null) {
transactionOptionsBuilder.setIsolationLevel(options.isolationLevel());
}
requestBuilder.setSingleUseTransaction(
defaultTransactionOptions().toBuilder().mergeFrom(transactionOptionsBuilder.build()));

if (options.hasMaxCommitDelay()) {
requestBuilder.setMaxCommitDelay(
Expand Down Expand Up @@ -444,7 +455,11 @@ ApiFuture<Transaction> beginTransactionAsync(
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId));
defaultTransactionOptions()
.toBuilder()
.mergeFrom(
createReadWriteTransactionOptions(
transactionOptions, previousTransactionId)));
if (sessionReference.getIsMultiplexed() && mutation != null) {
requestBuilder.setMutationKey(mutation);
}
Expand Down Expand Up @@ -489,7 +504,6 @@ TransactionContextImpl newTransaction(Options options, ByteString previousTransa
.setOptions(options)
.setTransactionId(null)
.setPreviousTransactionId(previousTransactionId)
.setOptions(options)
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
.setRpc(spanner.getRpc())
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
import io.grpc.CallCredentials;
import io.grpc.CompressorRegistry;
import io.grpc.Context;
Expand Down Expand Up @@ -178,6 +180,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final boolean enableExtendedTracing;
private final boolean enableEndToEndTracing;
private final String monitoringHost;
private final TransactionOptions defaultTransactionOptions;

enum TracingFramework {
OPEN_CENSUS,
Expand Down Expand Up @@ -807,6 +810,7 @@ protected SpannerOptions(Builder builder) {
enableBuiltInMetrics = builder.enableBuiltInMetrics;
enableEndToEndTracing = builder.enableEndToEndTracing;
monitoringHost = builder.monitoringHost;
defaultTransactionOptions = builder.defaultTransactionOptions;
}

/**
Expand Down Expand Up @@ -988,6 +992,7 @@ public static class Builder
private String monitoringHost = SpannerOptions.environment.getMonitoringHost();
private SslContext mTLSContext = null;
private boolean isExperimentalHost = false;
private TransactionOptions defaultTransactionOptions = TransactionOptions.getDefaultInstance();

private static String createCustomClientLibToken(String token) {
return token + " " + ServiceOptions.getGoogApiClientLibName();
Expand Down Expand Up @@ -1056,6 +1061,7 @@ protected Builder() {
this.enableBuiltInMetrics = options.enableBuiltInMetrics;
this.enableEndToEndTracing = options.enableEndToEndTracing;
this.monitoringHost = options.monitoringHost;
this.defaultTransactionOptions = options.defaultTransactionOptions;
}

@Override
Expand Down Expand Up @@ -1645,6 +1651,55 @@ public Builder setEnableEndToEndTracing(boolean enableEndToEndTracing) {
return this;
}

/**
* Provides the default read-write transaction options for all databases. These defaults are
* overridden by any explicit {@link com.google.cloud.spanner.Options.TransactionOption}
* provided through {@link DatabaseClient}.
*
* <p>Example Usage:
*
* <pre>{@code
* DefaultReadWriteTransactionOptions options = DefaultReadWriteTransactionOptions.newBuilder()
* .setIsolationLevel(IsolationLevel.SERIALIZABLE)
* .build();
* }</pre>
*/
public static class DefaultReadWriteTransactionOptions {
private final TransactionOptions defaultTransactionOptions;

private DefaultReadWriteTransactionOptions(TransactionOptions defaultTransactionOptions) {
this.defaultTransactionOptions = defaultTransactionOptions;
}

public static DefaultReadWriteTransactionOptionsBuilder newBuilder() {
return new DefaultReadWriteTransactionOptionsBuilder();
}

public static class DefaultReadWriteTransactionOptionsBuilder {
private final TransactionOptions.Builder transactionOptionsBuilder =
TransactionOptions.newBuilder();

public DefaultReadWriteTransactionOptionsBuilder setIsolationLevel(
IsolationLevel isolationLevel) {
transactionOptionsBuilder.setIsolationLevel(isolationLevel);
return this;
}

public DefaultReadWriteTransactionOptions build() {
return new DefaultReadWriteTransactionOptions(transactionOptionsBuilder.build());
}
}
}

/** Sets the {@link DefaultReadWriteTransactionOptions} for read-write transactions. */
public Builder setDefaultTransactionOptions(
DefaultReadWriteTransactionOptions defaultReadWriteTransactionOptions) {
Preconditions.checkNotNull(
defaultReadWriteTransactionOptions, "DefaultReadWriteTransactionOptions cannot be null");
this.defaultTransactionOptions = defaultReadWriteTransactionOptions.defaultTransactionOptions;
return this;
}

@SuppressWarnings("rawtypes")
@Override
public SpannerOptions build() {
Expand Down Expand Up @@ -1990,6 +2045,10 @@ String getMonitoringHost() {
return monitoringHost;
}

public TransactionOptions getDefaultTransactionOptions() {
return defaultTransactionOptions;
}

@BetaApi
public boolean isUseVirtualThreads() {
return useVirtualThreads;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -643,8 +643,12 @@ TransactionSelector getTransactionSelector() {
if (tx == null) {
return TransactionSelector.newBuilder()
.setBegin(
SessionImpl.createReadWriteTransactionOptions(
options, getPreviousTransactionId()))
this.session
.defaultTransactionOptions()
.toBuilder()
.mergeFrom(
SessionImpl.createReadWriteTransactionOptions(
options, getPreviousTransactionId())))
.build();
} else {
// Wait for the transaction to come available. The tx.get() call will fail with an
Expand Down
Loading
Loading