Skip to content

Commit d366c1c

Browse files
authored
Merge branch 'googleapis:main' into interval_support
2 parents df6ebe7 + 16cc6ee commit d366c1c

27 files changed

+843
-49
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import com.google.spanner.v1.ExecuteSqlRequest;
4949
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
5050
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
51+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
5152
import com.google.spanner.v1.PartialResultSet;
5253
import com.google.spanner.v1.ReadRequest;
5354
import com.google.spanner.v1.RequestOptions;
@@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) {
893894
this.session.onReadDone();
894895
}
895896

897+
/**
898+
* For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be
899+
* present in the RPC response. In such cases, this method will be a no-op.
900+
*/
901+
@Override
902+
public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {}
903+
896904
private ResultSet readInternal(
897905
String table,
898906
@Nullable String index,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.protobuf.ListValue;
2828
import com.google.protobuf.ProtocolMessageEnum;
2929
import com.google.protobuf.Value.KindCase;
30+
import com.google.spanner.v1.MultiplexedSessionPrecommitToken;
3031
import com.google.spanner.v1.Transaction;
3132
import java.io.IOException;
3233
import java.io.Serializable;
@@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId)
5758

5859
/** Called when the read finishes normally. */
5960
void onDone(boolean withBeginTransaction);
61+
62+
/**
63+
* Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token
64+
* will be included if the read-write transaction is executed on a multiplexed session.
65+
*/
66+
void onPrecommitToken(MultiplexedSessionPrecommitToken token);
6067
}
6168

6269
static final class LazyByteArray implements Serializable {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.common.base.MoreObjects;
2929
import com.google.common.base.Preconditions;
3030
import com.google.common.util.concurrent.MoreExecutors;
31+
import com.google.protobuf.ByteString;
3132

3233
/** Implementation of {@link AsyncTransactionManager}. */
3334
final class AsyncTransactionManagerImpl
@@ -80,7 +81,19 @@ public TransactionContextFutureImpl beginAsync() {
8081

8182
private ApiFuture<TransactionContext> internalBeginAsync(boolean firstAttempt) {
8283
txnState = TransactionState.STARTED;
83-
txn = session.newTransaction(options);
84+
85+
// Determine the latest transactionId when using a multiplexed session.
86+
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
87+
if (txn != null && session.getIsMultiplexed() && !firstAttempt) {
88+
// Use the current transactionId if available, otherwise fallback to the previous aborted
89+
// transactionId.
90+
multiplexedSessionPreviousTransactionId =
91+
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
92+
}
93+
94+
txn =
95+
session.newTransaction(
96+
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
8497
if (firstAttempt) {
8598
session.setActive(this);
8699
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/BuiltInOpenTelemetryMetricsProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_HASH_KEY;
2121
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_NAME_KEY;
2222
import static com.google.cloud.spanner.BuiltInMetricsConstant.CLIENT_UID_KEY;
23+
import static com.google.cloud.spanner.BuiltInMetricsConstant.DIRECT_PATH_ENABLED_KEY;
2324
import static com.google.cloud.spanner.BuiltInMetricsConstant.INSTANCE_CONFIG_ID_KEY;
2425
import static com.google.cloud.spanner.BuiltInMetricsConstant.LOCATION_ID_KEY;
2526
import static com.google.cloud.spanner.BuiltInMetricsConstant.PROJECT_ID_KEY;
@@ -83,6 +84,7 @@ Map<String, String> createClientAttributes(String projectId, String client_name)
8384
clientAttributes.put(LOCATION_ID_KEY.getKey(), detectClientLocation());
8485
clientAttributes.put(PROJECT_ID_KEY.getKey(), projectId);
8586
// TODO: Replace this with real value.
87+
clientAttributes.put(DIRECT_PATH_ENABLED_KEY.getKey(), "false");
8688
clientAttributes.put(INSTANCE_CONFIG_ID_KEY.getKey(), "unknown");
8789
clientAttributes.put(CLIENT_NAME_KEY.getKey(), client_name);
8890
String clientUid = getDefaultTaskValue();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/CompositeTracer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ public void addAttributes(Map<String, String> attributes) {
184184
for (ApiTracer child : children) {
185185
if (child instanceof MetricsTracer) {
186186
MetricsTracer metricsTracer = (MetricsTracer) child;
187-
attributes.forEach((key, value) -> metricsTracer.addAttributes(key, value));
187+
metricsTracer.addAttributes(attributes);
188188
}
189189
}
190190
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class DatabaseClientImpl implements DatabaseClient {
5959
/* useMultiplexedSessionBlindWrite = */ false,
6060
/* multiplexedSessionDatabaseClient = */ null,
6161
tracer,
62-
false);
62+
/* useMultiplexedSessionForRW = */ false);
6363
}
6464

6565
DatabaseClientImpl(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufR
4747

4848
GrpcResultSet(
4949
CloseableIterator<PartialResultSet> iterator, Listener listener, DecodeMode decodeMode) {
50-
this.iterator = new GrpcValueIterator(iterator);
50+
this.iterator = new GrpcValueIterator(iterator, listener);
5151
this.listener = listener;
5252
this.decodeMode = decodeMode;
5353
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static com.google.common.base.Preconditions.checkState;
2121

2222
import com.google.cloud.spanner.AbstractResultSet.CloseableIterator;
23+
import com.google.cloud.spanner.AbstractResultSet.Listener;
2324
import com.google.common.collect.AbstractIterator;
2425
import com.google.protobuf.ListValue;
2526
import com.google.protobuf.Value.KindCase;
@@ -44,9 +45,11 @@ private enum StreamValue {
4445
private PartialResultSet current;
4546
private int pos;
4647
private ResultSetStats statistics;
48+
private final Listener listener;
4749

48-
GrpcValueIterator(CloseableIterator<PartialResultSet> stream) {
50+
GrpcValueIterator(CloseableIterator<PartialResultSet> stream, Listener listener) {
4951
this.stream = stream;
52+
this.listener = listener;
5053
}
5154

5255
@SuppressWarnings("unchecked")
@@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException {
154157
ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e);
155158
}
156159
}
160+
// collect the precommit token from each PartialResultSet
161+
if (current.hasPrecommitToken()) {
162+
listener.onPrecommitToken(current.getPrecommitToken());
163+
}
157164
if (current.hasStats()) {
158165
statistics = current.getStats();
159166
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ static void throwIfTransactionsPending() {
6969
}
7070
}
7171

72-
static TransactionOptions createReadWriteTransactionOptions(Options options) {
72+
static TransactionOptions createReadWriteTransactionOptions(
73+
Options options, ByteString previousTransactionId) {
7374
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
7475
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
7576
transactionOptions.setExcludeTxnFromChangeStreams(true);
@@ -78,6 +79,10 @@ static TransactionOptions createReadWriteTransactionOptions(Options options) {
7879
if (options.withOptimisticLock() == Boolean.TRUE) {
7980
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
8081
}
82+
if (previousTransactionId != null
83+
&& previousTransactionId != com.google.protobuf.ByteString.EMPTY) {
84+
readWrite.setMultiplexedSessionPreviousTransactionId(previousTransactionId);
85+
}
8186
transactionOptions.setReadWrite(readWrite);
8287
return transactionOptions.build();
8388
}
@@ -427,13 +432,17 @@ public void close() {
427432
}
428433

429434
ApiFuture<ByteString> beginTransactionAsync(
430-
Options transactionOptions, boolean routeToLeader, Map<SpannerRpc.Option, ?> channelHint) {
435+
Options transactionOptions,
436+
boolean routeToLeader,
437+
Map<SpannerRpc.Option, ?> channelHint,
438+
ByteString previousTransactionId) {
431439
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
432440
final ISpan span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION);
433441
final BeginTransactionRequest request =
434442
BeginTransactionRequest.newBuilder()
435443
.setSession(getName())
436-
.setOptions(createReadWriteTransactionOptions(transactionOptions))
444+
.setOptions(
445+
createReadWriteTransactionOptions(transactionOptions, previousTransactionId))
437446
.build();
438447
final ApiFuture<Transaction> requestFuture;
439448
try (IScope ignore = tracer.withSpan(span)) {
@@ -469,11 +478,12 @@ ApiFuture<ByteString> beginTransactionAsync(
469478
return res;
470479
}
471480

472-
TransactionContextImpl newTransaction(Options options) {
481+
TransactionContextImpl newTransaction(Options options, ByteString previousTransactionId) {
473482
return TransactionContextImpl.newBuilder()
474483
.setSession(this)
475484
.setOptions(options)
476485
.setTransactionId(null)
486+
.setPreviousTransactionId(previousTransactionId)
477487
.setOptions(options)
478488
.setTrackTransactionStarter(spanner.getOptions().isTrackTransactionStarter())
479489
.setRpc(spanner.getRpc())

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1726,14 +1726,13 @@ private ApiTracerFactory getDefaultApiTracerFactory() {
17261726
private ApiTracerFactory createMetricsApiTracerFactory() {
17271727
OpenTelemetry openTelemetry =
17281728
this.builtInOpenTelemetryMetricsProvider.getOrCreateOpenTelemetry(
1729-
getDefaultProjectId(), getCredentials());
1729+
this.getProjectId(), getCredentials());
17301730

17311731
return openTelemetry != null
17321732
? new MetricsTracerFactory(
17331733
new OpenTelemetryMetricsRecorder(openTelemetry, BuiltInMetricsConstant.METER_NAME),
17341734
builtInOpenTelemetryMetricsProvider.createClientAttributes(
1735-
getDefaultProjectId(),
1736-
"spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
1735+
this.getProjectId(), "spanner-java/" + GaxProperties.getLibraryVersion(getClass())))
17371736
: null;
17381737
}
17391738

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionManagerImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.cloud.spanner.Options.TransactionOption;
2121
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
2222
import com.google.common.base.Preconditions;
23+
import com.google.protobuf.ByteString;
2324

2425
/** Implementation of {@link TransactionManager}. */
2526
final class TransactionManagerImpl implements TransactionManager, SessionTransaction {
@@ -53,7 +54,7 @@ public void setSpan(ISpan span) {
5354
public TransactionContext begin() {
5455
Preconditions.checkState(txn == null, "begin can only be called once");
5556
try (IScope s = tracer.withSpan(span)) {
56-
txn = session.newTransaction(options);
57+
txn = session.newTransaction(options, /* previousTransactionId = */ ByteString.EMPTY);
5758
session.setActive(this);
5859
txnState = TransactionState.STARTED;
5960
return txn;
@@ -102,7 +103,18 @@ public TransactionContext resetForRetry() {
102103
}
103104
try (IScope s = tracer.withSpan(span)) {
104105
boolean useInlinedBegin = txn.transactionId != null;
105-
txn = session.newTransaction(options);
106+
107+
// Determine the latest transactionId when using a multiplexed session.
108+
ByteString multiplexedSessionPreviousTransactionId = ByteString.EMPTY;
109+
if (session.getIsMultiplexed()) {
110+
// Use the current transactionId if available, otherwise fallback to the previous aborted
111+
// transactionId.
112+
multiplexedSessionPreviousTransactionId =
113+
txn.transactionId != null ? txn.transactionId : txn.getPreviousTransactionId();
114+
}
115+
txn =
116+
session.newTransaction(
117+
options, /* previousTransactionId = */ multiplexedSessionPreviousTransactionId);
106118
if (!useInlinedBegin) {
107119
txn.ensureTxn();
108120
}

0 commit comments

Comments
 (0)