Skip to content

chore(spanner): support mutation only operation for read-write mux #3423

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Oct 27, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ListValue;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

/**
Expand Down Expand Up @@ -402,20 +404,46 @@ private boolean isFloat32NaN(Value value) {
return value.getType().equals(Type.float32()) && Float.isNaN(value.getFloat32());
}

static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
/**
* Converts the list of mutations to the corresponding protobuf mutations and returns a random
* mutation from the available list based on the following heuristics:
*
* <ol>
* <li>1. Prefer mutations other than INSERT, as INSERT mutations may contain autogenerated
* columns whose information is unavailable on the client.
* <li>If the list only contains INSERT mutations, select the one with the highest number of
* values.
* </ol>
*/
static com.google.spanner.v1.Mutation toProtoAndReturnRandomMutation(
Iterable<Mutation> mutations, List<com.google.spanner.v1.Mutation> out) {
Mutation last = null;
// The mutation currently being built.
com.google.spanner.v1.Mutation.Builder proto = null;
// The "write" (!= DELETE) or "keySet" (==DELETE) for the last mutation encoded, for coalescing.
com.google.spanner.v1.Mutation.Write.Builder write = null;
com.google.spanner.v1.KeySet.Builder keySet = null;

// Stores all the mutations excluding INSERT mutations.
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert = new ArrayList<>();
// Stores the INSERT mutation with largest number of values.
com.google.spanner.v1.Mutation largestInsertMutation =
com.google.spanner.v1.Mutation.getDefaultInstance();

for (Mutation mutation : mutations) {
if (mutation.operation == Op.DELETE) {
if (last != null && last.operation == Op.DELETE && mutation.table.equals(last.table)) {
mutation.keySet.appendToProto(keySet);
} else {
if (proto != null) {
out.add(proto.build());
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(builtMutation);
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
if (allMutationsExcludingInsert.isEmpty()
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
largestInsertMutation = builtMutation;
}
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
}
proto = com.google.spanner.v1.Mutation.newBuilder();
com.google.spanner.v1.Mutation.Delete.Builder delete =
Expand All @@ -437,7 +465,14 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
write.addValues(values);
} else {
if (proto != null) {
out.add(proto.build());
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(builtMutation);
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
if (allMutationsExcludingInsert.isEmpty()
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
largestInsertMutation = builtMutation;
}
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
}
proto = com.google.spanner.v1.Mutation.newBuilder();
switch (mutation.operation) {
Expand All @@ -464,7 +499,46 @@ static void toProto(Iterable<Mutation> mutations, List<com.google.spanner.v1.Mut
}
// Flush last item.
if (proto != null) {
com.google.spanner.v1.Mutation builtMutation = proto.build();
out.add(proto.build());
// Skip tracking the largest insert mutation if there are mutations other than INSERT.
if (allMutationsExcludingInsert.isEmpty()
&& checkIfInsertMutationWithLargeValue(builtMutation, largestInsertMutation)) {
largestInsertMutation = builtMutation;
}
maybeAddMutationToListExcludingInserts(builtMutation, allMutationsExcludingInsert);
}

// Select a random mutation based on the heuristic.
if (!allMutationsExcludingInsert.isEmpty()) {
return allMutationsExcludingInsert.get(
ThreadLocalRandom.current().nextInt(allMutationsExcludingInsert.size()));
} else {
return largestInsertMutation;
}
}

// Returns true if the input mutation is of type INSERT and has more values than the current
// largest insert mutation.
private static boolean checkIfInsertMutationWithLargeValue(
com.google.spanner.v1.Mutation mutation,
com.google.spanner.v1.Mutation largestInsertMutation) {
// If largestInsertMutation is a default instance of Mutation, replace it with the current
// INSERT mutation, even if it contains zero values.
if (mutation.hasInsert() && !largestInsertMutation.hasInsert()) {
return true;
}
return mutation.hasInsert()
&& mutation.getInsert().getValuesCount()
> largestInsertMutation.getInsert().getValuesCount();
}

// Stores all mutations that are not of type INSERT.
private static void maybeAddMutationToListExcludingInserts(
com.google.spanner.v1.Mutation mutation,
List<com.google.spanner.v1.Mutation> allMutationsExcludingInsert) {
if (!mutation.hasInsert()) {
allMutationsExcludingInsert.add(mutation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public ImmutableList<Mutation> getMutations() {

static BatchWriteRequest.MutationGroup toProto(final MutationGroup mutationGroup) {
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutationGroup.getMutations(), mutationsProto);
Mutation.toProtoAndReturnRandomMutation(mutationGroup.getMutations(), mutationsProto);
return BatchWriteRequest.MutationGroup.newBuilder().addAllMutations(mutationsProto).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public CommitResponse writeAtLeastOnceWithOptions(
throws SpannerException {
setActive(null);
List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
Mutation.toProto(mutations, mutationsProto);
Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
Options options = Options.fromTransactionOptions(transactionOptions);
final CommitRequest.Builder requestBuilder =
CommitRequest.newBuilder()
Expand Down Expand Up @@ -431,19 +431,23 @@ public void close() {
}
}

ApiFuture<ByteString> beginTransactionAsync(
ApiFuture<Transaction> beginTransactionAsync(
Options transactionOptions,
boolean routeToLeader,
Map<SpannerRpc.Option, ?> channelHint,
ByteString previousTransactionId) {
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
ByteString previousTransactionId,
com.google.spanner.v1.Mutation mutation) {
final SettableApiFuture<Transaction> res = SettableApiFuture.create();
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
final BeginTransactionRequest request =
BeginTransactionRequest.Builder requestBuilder =
BeginTransactionRequest.newBuilder()
.setSession(getName())
.setOptions(
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
.build();
createReadWriteTransactionOptions(transactionOptions, previousTransactionId));
if (sessionReference.getIsMultiplexed() && mutation != null) {
requestBuilder.setMutationKey(mutation);
}
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture<Transaction> requestFuture;
try (IScope ignore = tracer.withSpan(span)) {
requestFuture = spanner.getRpc().beginTransactionAsync(request, channelHint, routeToLeader);
Expand All @@ -457,7 +461,7 @@ ApiFuture<ByteString> beginTransactionAsync(
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
}
span.end();
res.set(txn.getId());
res.set(txn);
} catch (ExecutionException e) {
span.setStatus(e);
span.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ void ensureTxn() {
ApiFuture<Void> ensureTxnAsync() {
final SettableApiFuture<Void> res = SettableApiFuture.create();
if (transactionId == null || isAborted()) {
createTxnAsync(res);
createTxnAsync(res, null);
} else {
span.addAnnotation("Transaction Initialized", "Id", transactionId.toStringUtf8());
txnLogger.log(
Expand All @@ -301,20 +301,29 @@ ApiFuture<Void> ensureTxnAsync() {
return res;
}

private void createTxnAsync(final SettableApiFuture<Void> res) {
private void createTxnAsync(
final SettableApiFuture<Void> res, com.google.spanner.v1.Mutation mutation) {
span.addAnnotation("Creating Transaction");
final ApiFuture<ByteString> fut =
final ApiFuture<Transaction> fut =
session.beginTransactionAsync(
options, isRouteToLeader(), getTransactionChannelHint(), getPreviousTransactionId());
options,
isRouteToLeader(),
getTransactionChannelHint(),
getPreviousTransactionId(),
mutation);
fut.addListener(
() -> {
try {
transactionId = fut.get();
Transaction txn = fut.get();
transactionId = txn.getId();
span.addAnnotation("Transaction Creation Done", "Id", transactionId.toStringUtf8());
txnLogger.log(
Level.FINER,
"Started transaction {0}",
txnLogger.isLoggable(Level.FINER) ? transactionId.asReadOnlyByteBuffer() : null);
if (txn.hasPrecommitToken()) {
onPrecommitToken(txn.getPrecommitToken());
}
res.set(null);
} catch (ExecutionException e) {
span.addAnnotation(
Expand Down Expand Up @@ -357,13 +366,14 @@ ApiFuture<CommitResponse> commitAsync() {
close();

List<com.google.spanner.v1.Mutation> mutationsProto = new ArrayList<>();
com.google.spanner.v1.Mutation randomMutation = null;
synchronized (committingLock) {
if (committing) {
throw new IllegalStateException(TRANSACTION_ALREADY_COMMITTED_MESSAGE);
}
committing = true;
if (!mutations.isEmpty()) {
Mutation.toProto(mutations, mutationsProto);
randomMutation = Mutation.toProtoAndReturnRandomMutation(mutations, mutationsProto);
}
}
final SettableApiFuture<CommitResponse> res = SettableApiFuture.create();
Expand Down Expand Up @@ -392,7 +402,7 @@ ApiFuture<CommitResponse> commitAsync() {
synchronized (lock) {
if (transactionIdFuture == null && transactionId == null && runningAsyncOperations == 0) {
finishOps = SettableApiFuture.create();
createTxnAsync(finishOps);
createTxnAsync(finishOps, randomMutation);
} else {
finishOps = finishedAsyncOperations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1828,7 +1828,7 @@ private ByteString getTransactionId(Session session, TransactionSelector tx) {
transactionId = null;
break;
case BEGIN:
transactionId = beginTransaction(session, tx.getBegin()).getId();
transactionId = beginTransaction(session, tx.getBegin(), null).getId();
break;
case ID:
Transaction transaction = transactions.get(tx.getId());
Expand Down Expand Up @@ -1883,7 +1883,8 @@ public void beginTransaction(
try {
beginTransactionExecutionTime.simulateExecutionTime(
exceptions, stickyGlobalExceptions, freezeLock);
Transaction transaction = beginTransaction(session, request.getOptions());
Transaction transaction =
beginTransaction(session, request.getOptions(), request.getMutationKey());
responseObserver.onNext(transaction);
responseObserver.onCompleted();
} catch (StatusRuntimeException t) {
Expand All @@ -1893,12 +1894,19 @@ public void beginTransaction(
}
}

private Transaction beginTransaction(Session session, TransactionOptions options) {
Transaction.Builder builder =
Transaction.newBuilder().setId(generateTransactionName(session.getName()));
private Transaction beginTransaction(
Session session, TransactionOptions options, com.google.spanner.v1.Mutation mutationKey) {
ByteString transactionId = generateTransactionName(session.getName());
Transaction.Builder builder = Transaction.newBuilder().setId(transactionId);
if (options != null && options.getModeCase() == ModeCase.READ_ONLY) {
setReadTimestamp(options, builder);
}
if (session.getMultiplexed()
&& options.getModeCase() == ModeCase.READ_WRITE
&& mutationKey != null) {
// Mutation only case in a read-write transaction.
builder.setPrecommitToken(getTransactionPrecommitToken(transactionId));
}
Transaction transaction = builder.build();
transactions.put(transaction.getId(), transaction);
transactionsStarted.add(transaction.getId());
Expand Down Expand Up @@ -2005,7 +2013,8 @@ public void commit(CommitRequest request, StreamObserver<CommitResponse> respons
session,
TransactionOptions.newBuilder()
.setReadWrite(ReadWrite.getDefaultInstance())
.build());
.build(),
null);
} else if (request.getTransactionId() != null) {
transaction = transactions.get(request.getTransactionId());
Optional<Boolean> aborted =
Expand Down Expand Up @@ -2490,6 +2499,10 @@ Session getSession(String name) {
return null;
}

static MultiplexedSessionPrecommitToken getTransactionPrecommitToken(ByteString transactionId) {
return getPrecommitToken("TransactionPrecommitToken", transactionId);
}

static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) {
return getPrecommitToken("ResultSetPrecommitToken", transactionId);
}
Expand Down
Loading
Loading