Skip to content

Commit 9e2c025

Browse files
committed
feat(spanner): Support REPEATABLE_READ for RW transaction
1 parent 8367bfa commit 9e2c025

File tree

8 files changed

+809
-8
lines changed

8 files changed

+809
-8
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClient.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,10 @@ default String getDatabaseRole() {
111111
* applied to any other requests on the transaction.
112112
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
113113
* {@link CommitResponse}.
114+
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
115+
* from the backend.
116+
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
117+
* the backend.
114118
* </ul>
115119
*
116120
* @return a response with the timestamp at which the write was committed
@@ -186,6 +190,10 @@ CommitResponse writeWithOptions(Iterable<Mutation> mutations, TransactionOption.
186190
* applied to any other requests on the transaction.
187191
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
188192
* {@link CommitResponse}.
193+
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
194+
* from the backend.
195+
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
196+
* the backend.
189197
* </ul>
190198
*
191199
* @return a response with the timestamp at which the write was committed
@@ -414,6 +422,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
414422
* applied to any other requests on the transaction.
415423
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
416424
* {@link CommitResponse}.
425+
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
426+
* from the backend.
427+
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
428+
* the backend.
417429
* </ul>
418430
*/
419431
TransactionRunner readWriteTransaction(TransactionOption... options);
@@ -454,6 +466,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
454466
* applied to any other requests on the transaction.
455467
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
456468
* {@link CommitResponse}.
469+
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
470+
* from the backend.
471+
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
472+
* the backend.
457473
* </ul>
458474
*/
459475
TransactionManager transactionManager(TransactionOption... options);
@@ -494,6 +510,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
494510
* applied to any other requests on the transaction.
495511
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
496512
* {@link CommitResponse}.
513+
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
514+
* from the backend.
515+
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
516+
* the backend.
497517
* </ul>
498518
*/
499519
AsyncRunner runAsync(TransactionOption... options);
@@ -548,6 +568,10 @@ ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
548568
* applied to any other requests on the transaction.
549569
* <li>{@link Options#commitStats()}: Request that the server includes commit statistics in the
550570
* {@link CommitResponse}.
571+
* <li>{@link Options#repeatableReadIsolationLevel()}: Request Repeatable Read Isolation Level
572+
* from the backend.
573+
* <li>{@link Options#serializableIsolationLevel()}: Request Serializable Isolation Level from
574+
* the backend.
551575
* </ul>
552576
*/
553577
AsyncTransactionManager transactionManagerAsync(TransactionOption... options);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,13 @@
2121
import com.google.spanner.v1.ReadRequest.LockHint;
2222
import com.google.spanner.v1.ReadRequest.OrderBy;
2323
import com.google.spanner.v1.RequestOptions.Priority;
24+
import com.google.spanner.v1.TransactionOptions.IsolationLevel;
2425
import java.io.Serializable;
2526
import java.time.Duration;
27+
import java.util.Arrays;
2628
import java.util.Objects;
29+
import java.util.function.Predicate;
30+
import java.util.stream.Stream;
2731

2832
/** Specifies options for various spanner operations */
2933
public final class Options implements Serializable {
@@ -131,7 +135,29 @@ public interface UpdateAdminApiOption extends AdminApiOption {}
131135
public interface QueryOption {}
132136

133137
/** Marker interface to mark options applicable to write operations */
134-
public interface TransactionOption {}
138+
public interface TransactionOption {
139+
Predicate<TransactionOption> isValidIsolationLevelOption =
140+
txnOption ->
141+
txnOption instanceof RepeatableReadOption || txnOption instanceof SerializableOption;
142+
143+
/**
144+
* Combines two arrays of TransactionOption, with primaryOptions taking precedence in case of
145+
* conflicts. Note that {@link
146+
* com.google.cloud.spanner.SpannerOptions.Builder.TransactionOptions} supports only the {@link
147+
* IsolationLevel} TransactionOption, meaning spannerOptions will contain a maximum of one
148+
* TransactionOption.
149+
*/
150+
static TransactionOption[] combine(
151+
TransactionOption[] primaryOptions, TransactionOption[] spannerOptions) {
152+
if (spannerOptions == null
153+
|| Arrays.stream(primaryOptions).anyMatch(isValidIsolationLevelOption)) {
154+
return primaryOptions;
155+
} else {
156+
return Stream.concat(Arrays.stream(primaryOptions), Arrays.stream(spannerOptions))
157+
.toArray(TransactionOption[]::new);
158+
}
159+
}
160+
}
135161

136162
/** Marker interface to mark options applicable to update operation. */
137163
public interface UpdateOption {}
@@ -159,6 +185,22 @@ public static TransactionOption optimisticLock() {
159185
return OPTIMISTIC_LOCK_OPTION;
160186
}
161187

188+
/**
189+
* Specifying this instructs the transaction to request Repeatable Read Isolation Level from the
190+
* backend.
191+
*/
192+
public static TransactionOption repeatableReadIsolationLevel() {
193+
return REPEATABLE_READ_OPTION;
194+
}
195+
196+
/**
197+
* Specifying this instructs the transaction to request Serializable Isolation Level from the
198+
* backend.
199+
*/
200+
public static TransactionOption serializableIsolationLevel() {
201+
return SERIALIZABLE_OPTION;
202+
}
203+
162204
/**
163205
* Specifying this instructs the transaction to be excluded from being recorded in change streams
164206
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
@@ -490,6 +532,26 @@ void appendToOptions(Options options) {
490532
}
491533
}
492534

535+
/** Option to request REPEATABLE READ isolation level for read/write transactions. */
536+
static final class RepeatableReadOption extends InternalOption implements TransactionOption {
537+
@Override
538+
void appendToOptions(Options options) {
539+
options.isolationLevel = IsolationLevel.REPEATABLE_READ;
540+
}
541+
}
542+
543+
static final RepeatableReadOption REPEATABLE_READ_OPTION = new RepeatableReadOption();
544+
545+
/** Option to request SERIALIZABLE isolation level for read/write transactions. */
546+
static final class SerializableOption extends InternalOption implements TransactionOption {
547+
@Override
548+
void appendToOptions(Options options) {
549+
options.isolationLevel = IsolationLevel.SERIALIZABLE;
550+
}
551+
}
552+
553+
static final SerializableOption SERIALIZABLE_OPTION = new SerializableOption();
554+
493555
private boolean withCommitStats;
494556

495557
private Duration maxCommitDelay;
@@ -512,6 +574,7 @@ void appendToOptions(Options options) {
512574
private RpcOrderBy orderBy;
513575
private RpcLockHint lockHint;
514576
private Boolean lastStatement;
577+
private IsolationLevel isolationLevel;
515578

516579
// Construction is via factory methods below.
517580
private Options() {}
@@ -664,6 +727,10 @@ LockHint lockHint() {
664727
return lockHint == null ? null : lockHint.proto;
665728
}
666729

730+
IsolationLevel isolationLevel() {
731+
return isolationLevel;
732+
}
733+
667734
@Override
668735
public String toString() {
669736
StringBuilder b = new StringBuilder();
@@ -726,6 +793,9 @@ public String toString() {
726793
if (lockHint != null) {
727794
b.append("lockHint: ").append(lockHint).append(' ');
728795
}
796+
if (isolationLevel != null) {
797+
b.append("isolationLevel: ").append(isolationLevel).append(' ');
798+
}
729799
return b.toString();
730800
}
731801

@@ -767,7 +837,8 @@ public boolean equals(Object o) {
767837
&& Objects.equals(directedReadOptions(), that.directedReadOptions())
768838
&& Objects.equals(orderBy(), that.orderBy())
769839
&& Objects.equals(isLastStatement(), that.isLastStatement())
770-
&& Objects.equals(lockHint(), that.lockHint());
840+
&& Objects.equals(lockHint(), that.lockHint())
841+
&& Objects.equals(isolationLevel(), that.isolationLevel());
771842
}
772843

773844
@Override
@@ -833,6 +904,9 @@ public int hashCode() {
833904
if (lockHint != null) {
834905
result = 31 * result + lockHint.hashCode();
835906
}
907+
if (isolationLevel != null) {
908+
result = 31 * result + isolationLevel.hashCode();
909+
}
836910
return result;
837911
}
838912

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,9 @@ static TransactionOptions createReadWriteTransactionOptions(
8383
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
8484
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
8585
}
86+
if (options.isolationLevel() != null) {
87+
transactionOptions.setIsolationLevel(options.isolationLevel());
88+
}
8689
transactionOptions.setReadWrite(readWrite);
8790
return transactionOptions.build();
8891
}
@@ -239,7 +242,10 @@ public CommitResponse writeAtLeastOnceWithOptions(
239242
setActive(null);
240243
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
241244
Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
242-
Options options = Options.fromTransactionOptions(transactionOptions);
245+
Options options =
246+
Options.fromTransactionOptions(
247+
TransactionOption.combine(
248+
transactionOptions, this.spanner.getOptions().getTransactionOptions()));
243249
final CommitRequest.Builder requestBuilder =
244250
CommitRequest.newBuilder()
245251
.setSession(getName())
@@ -252,6 +258,9 @@ public CommitResponse writeAtLeastOnceWithOptions(
252258
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
253259
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
254260
}
261+
if (options.isolationLevel() != null) {
262+
transactionOptionsBuilder.setIsolationLevel(options.isolationLevel());
263+
}
255264
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);
256265

257266
if (options.hasMaxCommitDelay()) {
@@ -396,22 +405,37 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) {
396405

397406
@Override
398407
public TransactionRunner readWriteTransaction(TransactionOption... options) {
399-
return setActive(new TransactionRunnerImpl(this, options));
408+
return setActive(
409+
new TransactionRunnerImpl(
410+
this,
411+
TransactionOption.combine(options, this.spanner.getOptions().getTransactionOptions())));
400412
}
401413

402414
@Override
403415
public AsyncRunner runAsync(TransactionOption... options) {
404-
return new AsyncRunnerImpl(setActive(new TransactionRunnerImpl(this, options)));
416+
return new AsyncRunnerImpl(
417+
setActive(
418+
new TransactionRunnerImpl(
419+
this,
420+
TransactionOption.combine(
421+
options, this.spanner.getOptions().getTransactionOptions()))));
405422
}
406423

407424
@Override
408425
public TransactionManager transactionManager(TransactionOption... options) {
409-
return new TransactionManagerImpl(this, currentSpan, tracer, options);
426+
return new TransactionManagerImpl(
427+
this,
428+
currentSpan,
429+
tracer,
430+
TransactionOption.combine(options, this.spanner.getOptions().getTransactionOptions()));
410431
}
411432

412433
@Override
413434
public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption... options) {
414-
return new AsyncTransactionManagerImpl(this, currentSpan, options);
435+
return new AsyncTransactionManagerImpl(
436+
this,
437+
currentSpan,
438+
TransactionOption.combine(options, this.spanner.getOptions().getTransactionOptions()));
415439
}
416440

417441
@Override

0 commit comments

Comments
 (0)