Skip to content

Commit b3fc7c4

Browse files
ko3a4okgcf-owl-bot[bot]rajatbhatta
authored
feat: add support for Optimistic Concurrency Control (#2142)
* feat: add support for Optimistic Concurrency Control Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com> Co-authored-by: Rajat Bhatta <[email protected]>
1 parent d5f5237 commit b3fc7c4

File tree

7 files changed

+171
-10
lines changed

7 files changed

+171
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,17 @@ public static TransactionOption commitStats() {
8787
return COMMIT_STATS_OPTION;
8888
}
8989

90+
/**
91+
* Specifying this instructs the transaction to request Optimistic Lock from the backend. In this
92+
* concurrency mode, operations during the execution phase, i.e., reads and queries, are performed
93+
* without acquiring locks, and transactional consistency is ensured by running a validation
94+
* process in the commit phase (when any needed locks are acquired). The validation process
95+
* succeeds only if there are no conflicting committed transactions (that committed mutations to
96+
* the read data at a commit timestamp after the read timestamp).
97+
*/
98+
public static TransactionOption optimisticLock() {
99+
return OPTIMISTIC_LOCK_OPTION;
100+
}
90101
/**
91102
* Specifying this will cause the read to yield at most this many rows. This should be greater
92103
* than 0.
@@ -207,6 +218,16 @@ void appendToOptions(Options options) {
207218

208219
static final CommitStatsOption COMMIT_STATS_OPTION = new CommitStatsOption();
209220

221+
/** Option to request Optimistic Concurrency Control for read/write transactions. */
222+
static final class OptimisticLockOption extends InternalOption implements TransactionOption {
223+
@Override
224+
void appendToOptions(Options options) {
225+
options.withOptimisticLock = true;
226+
}
227+
}
228+
229+
static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();
230+
210231
/** Option pertaining to flow control. */
211232
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
212233
final int prefetchChunks;
@@ -299,6 +320,7 @@ void appendToOptions(Options options) {
299320
private String tag;
300321
private String etag;
301322
private Boolean validateOnly;
323+
private Boolean withOptimisticLock;
302324

303325
// Construction is via factory methods below.
304326
private Options() {}
@@ -387,6 +409,10 @@ Boolean validateOnly() {
387409
return validateOnly;
388410
}
389411

412+
Boolean withOptimisticLock() {
413+
return withOptimisticLock;
414+
}
415+
390416
@Override
391417
public String toString() {
392418
StringBuilder b = new StringBuilder();
@@ -420,6 +446,9 @@ public String toString() {
420446
if (validateOnly != null) {
421447
b.append("validateOnly: ").append(validateOnly).append(' ');
422448
}
449+
if (withOptimisticLock != null) {
450+
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
451+
}
423452
return b.toString();
424453
}
425454

@@ -453,7 +482,8 @@ public boolean equals(Object o) {
453482
&& Objects.equals(priority(), that.priority())
454483
&& Objects.equals(tag(), that.tag())
455484
&& Objects.equals(etag(), that.etag())
456-
&& Objects.equals(validateOnly(), that.validateOnly());
485+
&& Objects.equals(validateOnly(), that.validateOnly())
486+
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock());
457487
}
458488

459489
@Override
@@ -492,6 +522,9 @@ public int hashCode() {
492522
if (validateOnly != null) {
493523
result = 31 * result + validateOnly.hashCode();
494524
}
525+
if (withOptimisticLock != null) {
526+
result = 31 * result + withOptimisticLock.hashCode();
527+
}
495528
return result;
496529
}
497530

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ static void throwIfTransactionsPending() {
6767
}
6868
}
6969

70+
static TransactionOptions createReadWriteTransactionOptions(Options options) {
71+
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
72+
if (options.withOptimisticLock() == Boolean.TRUE) {
73+
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
74+
}
75+
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
76+
}
77+
7078
/**
7179
* Represents a transaction within a session. "Transaction" here is used in the general sense,
7280
* which covers standalone reads, standalone writes, single-use and multi-use read-only
@@ -299,14 +307,16 @@ ByteString beginTransaction() {
299307
}
300308

301309
ApiFuture<ByteString> beginTransactionAsync() {
310+
return beginTransactionAsync(Options.fromTransactionOptions());
311+
}
312+
313+
ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions) {
302314
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
303315
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
304316
final BeginTransactionRequest request =
305317
BeginTransactionRequest.newBuilder()
306318
.setSession(name)
307-
.setOptions(
308-
TransactionOptions.newBuilder()
309-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
319+
.setOptions(createReadWriteTransactionOptions(transactionOptions))
310320
.build();
311321
final ApiFuture<Transaction> requestFuture =
312322
spanner.getRpc().beginTransactionAsync(request, options);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ ApiFuture<Void> ensureTxnAsync() {
255255

256256
private void createTxnAsync(final SettableApiFuture<Void> res) {
257257
span.addAnnotation("Creating Transaction");
258-
final ApiFuture<ByteString> fut = session.beginTransactionAsync();
258+
final ApiFuture<ByteString> fut = session.beginTransactionAsync(options);
259259
fut.addListener(
260260
() -> {
261261
try {
@@ -493,9 +493,7 @@ TransactionSelector getTransactionSelector() {
493493
}
494494
if (tx == null) {
495495
return TransactionSelector.newBuilder()
496-
.setBegin(
497-
TransactionOptions.newBuilder()
498-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
496+
.setBegin(SessionImpl.createReadWriteTransactionOptions(options))
499497
.build();
500498
} else {
501499
// Wait for the transaction to come available. The tx.get() call will fail with an

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,4 +561,24 @@ public void transactionEquality() {
561561
o3 = Options.fromTransactionOptions(Options.tag("app=spanner,env=stage"));
562562
assertThat(o2.equals(o3)).isFalse();
563563
}
564+
565+
@Test
566+
public void optimisticLockEquality() {
567+
Options option1 = Options.fromTransactionOptions(Options.optimisticLock());
568+
Options option2 = Options.fromTransactionOptions(Options.optimisticLock());
569+
Options option3 = Options.fromReadOptions();
570+
571+
assertEquals(option1, option2);
572+
assertNotEquals(option1, option3);
573+
}
574+
575+
@Test
576+
public void optimisticLockHashCode() {
577+
Options option1 = Options.fromTransactionOptions(Options.optimisticLock());
578+
Options option2 = Options.fromTransactionOptions(Options.optimisticLock());
579+
Options option3 = Options.fromReadOptions();
580+
581+
assertEquals(option1.hashCode(), option2.hashCode());
582+
assertNotEquals(option1.hashCode(), option3.hashCode());
583+
}
564584
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadWriteTransactionWithInlineBeginTest.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,33 @@
2323
import com.google.api.gax.grpc.testing.LocalChannelProvider;
2424
import com.google.cloud.NoCredentials;
2525
import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult;
26+
import com.google.common.collect.Iterables;
2627
import com.google.protobuf.AbstractMessage;
2728
import com.google.protobuf.ListValue;
2829
import com.google.spanner.v1.BeginTransactionRequest;
30+
import com.google.spanner.v1.ExecuteSqlRequest;
31+
import com.google.spanner.v1.ReadRequest;
2932
import com.google.spanner.v1.ResultSetMetadata;
3033
import com.google.spanner.v1.StructType;
3134
import com.google.spanner.v1.StructType.Field;
35+
import com.google.spanner.v1.TransactionOptions;
36+
import com.google.spanner.v1.TransactionOptions.ReadWrite;
3237
import com.google.spanner.v1.TypeCode;
3338
import io.grpc.Server;
3439
import io.grpc.Status;
3540
import io.grpc.inprocess.InProcessServerBuilder;
3641
import java.io.IOException;
3742
import java.util.ArrayList;
3843
import java.util.Arrays;
44+
import java.util.Collection;
45+
import java.util.Collections;
3946
import java.util.List;
4047
import java.util.concurrent.ExecutorService;
4148
import java.util.concurrent.Executors;
4249
import java.util.concurrent.Future;
4350
import java.util.concurrent.ScheduledThreadPoolExecutor;
4451
import java.util.concurrent.atomic.AtomicInteger;
52+
import java.util.stream.Collectors;
4553
import org.junit.After;
4654
import org.junit.AfterClass;
4755
import org.junit.Before;
@@ -85,6 +93,10 @@ public class ReadWriteTransactionWithInlineBeginTest {
8593
.build())
8694
.setMetadata(SELECT1_METADATA)
8795
.build();
96+
private static final TransactionOptions OPTIMISTIC_LOCK_OPTIONS =
97+
TransactionOptions.newBuilder()
98+
.setReadWrite(ReadWrite.newBuilder().setReadLockMode(ReadWrite.ReadLockMode.OPTIMISTIC))
99+
.build();
88100
private Spanner spanner;
89101
private DatabaseClient client;
90102

@@ -102,6 +114,9 @@ public static void startStaticServer() throws IOException {
102114
StatementResult.exception(
103115
INVALID_SELECT_STATEMENT,
104116
Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException()));
117+
mockSpanner.putStatementResult(
118+
StatementResult.read(
119+
"FOO", KeySet.all(), Collections.singletonList("ID"), SELECT1_RESULTSET));
105120

106121
String uniqueName = InProcessServerBuilder.generateName();
107122
server =
@@ -389,6 +404,76 @@ public void failedBatchUpdateAndThenUpdate() {
389404
assertThat(countTransactionsStarted()).isEqualTo(2);
390405
}
391406

407+
@Test
408+
public void executeSqlWithOptimisticConcurrencyControl() {
409+
client
410+
.readWriteTransaction(Options.optimisticLock())
411+
.run(
412+
transaction -> {
413+
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
414+
while (rs.next()) {
415+
assertEquals(rs.getLong(0), 1);
416+
}
417+
}
418+
return null;
419+
});
420+
Collection<AbstractMessage> requests =
421+
mockSpanner.getRequests().stream()
422+
.filter(msg -> msg.getClass().equals(ExecuteSqlRequest.class))
423+
.collect(Collectors.toList());
424+
assertEquals(requests.size(), 1);
425+
ExecuteSqlRequest request = (ExecuteSqlRequest) Iterables.getOnlyElement(requests);
426+
assertEquals(request.getTransaction().getBegin(), OPTIMISTIC_LOCK_OPTIONS);
427+
}
428+
429+
@Test
430+
public void readWithOptimisticConcurrencyControl() {
431+
client
432+
.readWriteTransaction(Options.optimisticLock())
433+
.run(
434+
transaction -> {
435+
try (ResultSet rs =
436+
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
437+
while (rs.next()) {
438+
assertEquals(rs.getLong(0), 1);
439+
}
440+
}
441+
return null;
442+
});
443+
Collection<AbstractMessage> requests =
444+
mockSpanner.getRequests().stream()
445+
.filter(msg -> msg.getClass().equals(ReadRequest.class))
446+
.collect(Collectors.toList());
447+
assertEquals(requests.size(), 1);
448+
ReadRequest request = (ReadRequest) Iterables.getOnlyElement(requests);
449+
assertThat(request.getTransaction().getBegin()).isEqualTo(OPTIMISTIC_LOCK_OPTIONS);
450+
}
451+
452+
@Test
453+
public void beginTransactionWithOptimisticConcurrencyControl() {
454+
client
455+
.readWriteTransaction(Options.optimisticLock())
456+
.run(
457+
transaction -> {
458+
// Instead of adding the BeginTransaction option to the next statement, the client
459+
// library will force a complete retry of the entire transaction, and use an explicit
460+
// BeginTransaction RPC invocation for that transaction in order to include the failed
461+
// statement in the transaction as well.
462+
try (ResultSet rs = transaction.executeQuery(INVALID_SELECT_STATEMENT)) {
463+
SpannerException e = assertThrows(SpannerException.class, () -> rs.next());
464+
assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode());
465+
}
466+
return transaction.executeUpdate(UPDATE_STATEMENT);
467+
});
468+
Collection<AbstractMessage> requests =
469+
mockSpanner.getRequests().stream()
470+
.filter(msg -> msg.getClass().equals(BeginTransactionRequest.class))
471+
.collect(Collectors.toList());
472+
assertEquals(requests.size(), 1);
473+
BeginTransactionRequest request = (BeginTransactionRequest) Iterables.getOnlyElement(requests);
474+
assertEquals(request.getOptions(), OPTIMISTIC_LOCK_OPTIONS);
475+
}
476+
392477
@Test
393478
public void failedQueryAndThenUpdate() {
394479
Long updateCount =

google-cloud-spanner/src/test/java/com/google/cloud/spanner/RetryOnInvalidatedSessionTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,21 @@ public void readWriteTransactionRead() throws InterruptedException {
530530
}));
531531
}
532532

533+
@Test
534+
public void readWriteTransactionReadWithOptimisticLock() throws InterruptedException {
535+
TransactionRunner runner = client.readWriteTransaction(Options.optimisticLock());
536+
assertThrowsSessionNotFoundIfShouldFail(
537+
() ->
538+
runner.run(
539+
transaction -> {
540+
try (ResultSet rs =
541+
transaction.read("FOO", KeySet.all(), Collections.singletonList("BAR"))) {
542+
while (rs.next()) {}
543+
}
544+
return null;
545+
}));
546+
}
547+
533548
@Test
534549
public void readWriteTransactionReadUsingIndex() throws InterruptedException {
535550
TransactionRunner runner = client.readWriteTransaction();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionPoolTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,7 @@ public void testSessionNotFoundReadWriteTransaction() {
785785
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
786786
when(closedSession.newTransaction(Options.fromTransactionOptions()))
787787
.thenReturn(closedTransactionContext);
788-
when(closedSession.beginTransactionAsync()).thenThrow(sessionNotFound);
788+
when(closedSession.beginTransactionAsync(any())).thenThrow(sessionNotFound);
789789
TransactionRunnerImpl closedTransactionRunner = new TransactionRunnerImpl(closedSession);
790790
closedTransactionRunner.setSpan(mock(Span.class));
791791
when(closedSession.readWriteTransaction()).thenReturn(closedTransactionRunner);
@@ -798,7 +798,7 @@ public void testSessionNotFoundReadWriteTransaction() {
798798
final TransactionContextImpl openTransactionContext = mock(TransactionContextImpl.class);
799799
when(openSession.newTransaction(Options.fromTransactionOptions()))
800800
.thenReturn(openTransactionContext);
801-
when(openSession.beginTransactionAsync())
801+
when(openSession.beginTransactionAsync(any()))
802802
.thenReturn(ApiFutures.immediateFuture(ByteString.copyFromUtf8("open-txn")));
803803
TransactionRunnerImpl openTransactionRunner = new TransactionRunnerImpl(openSession);
804804
openTransactionRunner.setSpan(mock(Span.class));

0 commit comments

Comments
 (0)