Skip to content

Commit c15339f

Browse files
authored
chore(spanner): support mutation only operation for read-write mux (#3423)
* chore(spanner): support Mutation only case FOR R/W mux * chore(spanner): unit test for selecting random mutation * chore(spanner): support precommit token for mutation only in mock spanner and mock spanner tests * chore(spanner): commit for debuging flakkiness * chore(spanner): debug flakiness * chore(spanner): update logic * chore(spanner): lint fix * chore(spanner): review comments * chore(spanner): handle empty insert mutation case * chore(spanner): add helper methods * chore(spanner): review comments
1 parent 79d7239 commit c15339f

File tree

10 files changed

+390
-36
lines changed

10 files changed

+390
-36
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Mutation.java

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,15 @@
2222
import com.google.common.collect.ImmutableList;
2323
import com.google.protobuf.ListValue;
2424
import java.io.Serializable;
25+
import java.util.ArrayList;
2526
import java.util.Collections;
2627
import java.util.HashSet;
2728
import java.util.LinkedHashMap;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
3132
import java.util.Set;
33+
import java.util.concurrent.ThreadLocalRandom;
3234
import javax.annotation.Nullable;
3335

3436
/**
@@ -402,20 +404,46 @@ private boolean isFloat32NaN(Value value) {
402404
return value.getType().equals(Type.float32()) && Float.isNaN(value.getFloat32());
403405
}
404406

405-
static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
407+
/**
408+
* Converts the list of mutations to the corresponding protobuf mutations and returns a random
409+
* mutation from the available list based on the following heuristics:
410+
*
411+
* <ol>
412+
* <li>1. Prefer mutations other than INSERT, as INSERT mutations may contain autogenerated
413+
* columns whose information is unavailable on the client.
414+
* <li>If the list only contains INSERT mutations, select the one with the highest number of
415+
* values.
416+
* </ol>
417+
*/
418+
static com.google.spanner.v1.Mutation toProtoAndReturnRandomMutation(
419+
Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
406420
Mutation last = null;
407421
// The mutation currently being built.
408422
com.google.spanner.v1.Mutation.Builder proto = null;
409423
// The "write" (!= DELETE) or "keySet" (==DELETE) for the last mutation encoded, for coalescing.
410424
com.google.spanner.v1.Mutation.Write.Builder write = null;
411425
com.google.spanner.v1.KeySet.Builder keySet = null;
426+
427+
// Stores all the mutations excluding INSERT mutations.
428+
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert = new ArrayList<>();
429+
// Stores the INSERT mutation with largest number of values.
430+
com.google.spanner.v1.Mutation largestInsertMutation =
431+
com.google.spanner.v1.Mutation.getDefaultInstance();
432+
412433
for (Mutation mutation : mutations) {
413434
if (mutation.operation == Op.DELETE) {
414435
if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) {
415436
mutation.keySet.appendToProto(keySet);
416437
} else {
417438
if (proto != null) {
418-
out.add(proto.build());
439+
com.google.spanner.v1.Mutation builtMutation = proto.build();
440+
out.add(builtMutation);
441+
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
442+
if (allMutationsExcludingInsert.isEmpty()
443+
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
444+
largestInsertMutation = builtMutation;
445+
}
446+
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
419447
}
420448
proto = com.google.spanner.v1.Mutation.newBuilder();
421449
com.google.spanner.v1.Mutation.Delete.Builder delete =
@@ -437,7 +465,14 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
437465
write.addValues(values);
438466
} else {
439467
if (proto != null) {
440-
out.add(proto.build());
468+
com.google.spanner.v1.Mutation builtMutation = proto.build();
469+
out.add(builtMutation);
470+
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
471+
if (allMutationsExcludingInsert.isEmpty()
472+
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
473+
largestInsertMutation = builtMutation;
474+
}
475+
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
441476
}
442477
proto = com.google.spanner.v1.Mutation.newBuilder();
443478
switch (mutation.operation) {
@@ -464,7 +499,46 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
464499
}
465500
// Flush last item.
466501
if (proto != null) {
502+
com.google.spanner.v1.Mutation builtMutation = proto.build();
467503
out.add(proto.build());
504+
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
505+
if (allMutationsExcludingInsert.isEmpty()
506+
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
507+
largestInsertMutation = builtMutation;
508+
}
509+
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
510+
}
511+
512+
// Select a random mutation based on the heuristic.
513+
if (!allMutationsExcludingInsert.isEmpty()) {
514+
return allMutationsExcludingInsert.get(
515+
ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size()));
516+
} else {
517+
return largestInsertMutation;
518+
}
519+
}
520+
521+
// Returns true if the input mutation is of type INSERT and has more values than the current
522+
// largest insert mutation.
523+
private static boolean checkIfInsertMutationWithLargeValue(
524+
com.google.spanner.v1.Mutation mutation,
525+
com.google.spanner.v1.Mutation largestInsertMutation) {
526+
// If largestInsertMutation is a default instance of Mutation, replace it with the current
527+
// INSERT mutation, even if it contains zero values.
528+
if (mutation.hasInsert() && !largestInsertMutation.hasInsert()) {
529+
return true;
530+
}
531+
return mutation.hasInsert()
532+
&& mutation.getInsert().getValuesCount()
533+
> largestInsertMutation.getInsert().getValuesCount();
534+
}
535+
536+
// Stores all mutations that are not of type INSERT.
537+
private static void maybeAddMutationToListExcludingInserts(
538+
com.google.spanner.v1.Mutation mutation,
539+
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert) {
540+
if (!mutation.hasInsert()) {
541+
allMutationsExcludingInsert.add(mutation);
468542
}
469543
}
470544
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MutationGroup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public ImmutableList<Mutation> getMutations() {
4848

4949
static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
5050
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
51-
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
51+
Mutation.toProtoAndReturnRandomMutation(mutationGroup.getMutations(), mutationsProto);
5252
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
5353
}
5454

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
238238
throws SpannerException {
239239
setActive(null);
240240
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
241-
Mutation.toProto(mutations, mutationsProto);
241+
Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
242242
Options options = Options.fromTransactionOptions(transactionOptions);
243243
final CommitRequest.Builder requestBuilder =
244244
CommitRequest.newBuilder()
@@ -431,19 +431,23 @@ public void close() {
431431
}
432432
}
433433

434-
ApiFuture<ByteString> beginTransactionAsync(
434+
ApiFuture<Transaction> beginTransactionAsync(
435435
Options transactionOptions,
436436
boolean routeToLeader,
437437
Map<SpannerRpc.Option, ?> channelHint,
438-
ByteString previousTransactionId) {
439-
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
438+
ByteString previousTransactionId,
439+
com.google.spanner.v1.Mutation mutation) {
440+
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
440441
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
441-
final BeginTransactionRequest request =
442+
BeginTransactionRequest.Builder requestBuilder =
442443
BeginTransactionRequest.newBuilder()
443444
.setSession(getName())
444445
.setOptions(
445-
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
446-
.build();
446+
createReadWriteTransactionOptions(transactionOptions, previousTransactionId));
447+
if (sessionReference.getIsMultiplexed() && mutation != null) {
448+
requestBuilder.setMutationKey(mutation);
449+
}
450+
final BeginTransactionRequest request = requestBuilder.build();
447451
final ApiFuture<Transaction> requestFuture;
448452
try (IScope ignore = tracer.withSpan(span)) {
449453
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
@@ -457,7 +461,7 @@ ApiFuture<ByteString> beginTransactionAsync(
457461
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
458462
}
459463
span.end();
460-
res.set(txn.getId());
464+
res.set(txn);
461465
} catch (ExecutionException e) {
462466
span.setStatus(e);
463467
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ void ensureTxn() {
289289
ApiFuture<Void> ensureTxnAsync() {
290290
final SettableApiFuture<Void> res = SettableApiFuture.create();
291291
if (transactionId == null || isAborted()) {
292-
createTxnAsync(res);
292+
createTxnAsync(res, null);
293293
} else {
294294
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
295295
txnLogger.log(
@@ -301,20 +301,29 @@ ApiFuture<Void> ensureTxnAsync() {
301301
return res;
302302
}
303303

304-
private void createTxnAsync(final SettableApiFuture<Void> res) {
304+
private void createTxnAsync(
305+
final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
305306
span.addAnnotation("Creating Transaction");
306-
final ApiFuture<ByteString> fut =
307+
final ApiFuture<Transaction> fut =
307308
session.beginTransactionAsync(
308-
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
309+
options,
310+
isRouteToLeader(),
311+
getTransactionChannelHint(),
312+
getPreviousTransactionId(),
313+
mutation);
309314
fut.addListener(
310315
() -> {
311316
try {
312-
transactionId = fut.get();
317+
Transaction txn = fut.get();
318+
transactionId = txn.getId();
313319
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
314320
txnLogger.log(
315321
Level.FINER,
316322
"Started transaction {0}",
317323
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
324+
if (txn.hasPrecommitToken()) {
325+
onPrecommitToken(txn.getPrecommitToken());
326+
}
318327
res.set(null);
319328
} catch (ExecutionException e) {
320329
span.addAnnotation(
@@ -357,13 +366,14 @@ ApiFuture<CommitResponse> commitAsync() {
357366
close();
358367

359368
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
369+
com.google.spanner.v1.Mutation randomMutation = null;
360370
synchronized (committingLock) {
361371
if (committing) {
362372
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
363373
}
364374
committing = true;
365375
if (!mutations.isEmpty()) {
366-
Mutation.toProto(mutations, mutationsProto);
376+
randomMutation = Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
367377
}
368378
}
369379
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
@@ -392,7 +402,7 @@ ApiFuture<CommitResponse> commitAsync() {
392402
synchronized (lock) {
393403
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
394404
finishOps = SettableApiFuture.create();
395-
createTxnAsync(finishOps);
405+
createTxnAsync(finishOps, randomMutation);
396406
} else {
397407
finishOps = finishedAsyncOperations;
398408
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,7 +1828,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
18281828
transactionId = null;
18291829
break;
18301830
case BEGIN:
1831-
transactionId = beginTransaction(session, tx.getBegin()).getId();
1831+
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
18321832
break;
18331833
case ID:
18341834
Transaction transaction = transactions.get(tx.getId());
@@ -1883,7 +1883,8 @@ public void beginTransaction(
18831883
try {
18841884
beginTransactionExecutionTime.simulateExecutionTime(
18851885
exceptions, stickyGlobalExceptions, freezeLock);
1886-
Transaction transaction = beginTransaction(session, request.getOptions());
1886+
Transaction transaction =
1887+
beginTransaction(session, request.getOptions(), request.getMutationKey());
18871888
responseObserver.onNext(transaction);
18881889
responseObserver.onCompleted();
18891890
} catch (StatusRuntimeException t) {
@@ -1893,12 +1894,19 @@ public void beginTransaction(
18931894
}
18941895
}
18951896

1896-
private Transaction beginTransaction(Session session, TransactionOptions options) {
1897-
Transaction.Builder builder =
1898-
Transaction.newBuilder().setId(generateTransactionName(session.getName()));
1897+
private Transaction beginTransaction(
1898+
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
1899+
ByteString transactionId = generateTransactionName(session.getName());
1900+
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
18991901
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
19001902
setReadTimestamp(options, builder);
19011903
}
1904+
if (session.getMultiplexed()
1905+
&& options.getModeCase() == ModeCase.READ_WRITE
1906+
&& mutationKey != null) {
1907+
// Mutation only case in a read-write transaction.
1908+
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
1909+
}
19021910
Transaction transaction = builder.build();
19031911
transactions.put(transaction.getId(), transaction);
19041912
transactionsStarted.add(transaction.getId());
@@ -2005,7 +2013,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
20052013
session,
20062014
TransactionOptions.newBuilder()
20072015
.setReadWrite(ReadWrite.getDefaultInstance())
2008-
.build());
2016+
.build(),
2017+
null);
20092018
} else if (request.getTransactionId() != null) {
20102019
transaction = transactions.get(request.getTransactionId());
20112020
Optional<Boolean> aborted =
@@ -2490,6 +2499,10 @@ Session getSession(String name) {
24902499
return null;
24912500
}
24922501

2502+
static MultiplexedSessionPrecommitToken getTransactionPrecommitToken(ByteString transactionId) {
2503+
return getPrecommitToken("TransactionPrecommitToken", transactionId);
2504+
}
2505+
24932506
static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) {
24942507
return getPrecommitToken("ResultSetPrecommitToken", transactionId);
24952508
}

0 commit comments

Comments
 (0)