Skip to content

Commit 0f3ca8a

Browse files
committed
Add x-goog-spanner-route-to-leader header to Spanner RPC contexts for RW/PDML transactions.
The header is added to support leader-aware-routing feature, which aims at reducing cross-regional latency for RW/PDML transactions in a multi-region instance.
1 parent a40bda9 commit 0f3ca8a

15 files changed

+198
-73
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ static Builder newBuilder() {
161161
private SingleReadContext(Builder builder) {
162162
super(builder);
163163
this.bound = builder.bound;
164+
this.routeToLeader = false;
164165
}
165166

166167
@GuardedBy("lock")
@@ -291,6 +292,7 @@ static Builder newBuilder() {
291292
this.timestamp = builder.timestamp;
292293
this.transactionId = builder.transactionId;
293294
}
295+
this.routeToLeader = false;
294296
}
295297

296298
@Override
@@ -347,7 +349,8 @@ void initTransaction() {
347349
.setSession(session.getName())
348350
.setOptions(options)
349351
.build();
350-
Transaction transaction = rpc.beginTransaction(request, session.getOptions());
352+
Transaction transaction =
353+
rpc.beginTransaction(request, session.getOptions(), routeToLeader);
351354
if (!transaction.hasReadTimestamp()) {
352355
throw SpannerExceptionFactory.newSpannerException(
353356
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
@@ -380,6 +383,7 @@ void initTransaction() {
380383
Span span;
381384
private final int defaultPrefetchChunks;
382385
private final QueryOptions defaultQueryOptions;
386+
protected boolean routeToLeader = false;
383387

384388
@GuardedBy("lock")
385389
private boolean isValid = true;
@@ -664,7 +668,8 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
664668
request.setTransaction(selector);
665669
}
666670
SpannerRpc.StreamingCall call =
667-
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
671+
rpc.executeQuery(
672+
request.build(), stream.consumer(), session.getOptions(), routeToLeader);
668673
call.request(prefetchChunks);
669674
stream.setCall(call, request.getTransaction().hasBegin());
670675
return stream;
@@ -792,7 +797,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
792797
}
793798
builder.setRequestOptions(buildRequestOptions(readOptions));
794799
SpannerRpc.StreamingCall call =
795-
rpc.read(builder.build(), stream.consumer(), session.getOptions());
800+
rpc.read(builder.build(), stream.consumer(), session.getOptions(), routeToLeader);
796801
call.request(prefetchChunks);
797802
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
798803
return stream;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private ByteString initTransaction() {
202202
TransactionOptions.newBuilder()
203203
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
204204
.build();
205-
Transaction tx = rpc.beginTransaction(request, session.getOptions());
205+
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
206206
if (tx.getId().isEmpty()) {
207207
throw SpannerExceptionFactory.newSpannerException(
208208
ErrorCode.INTERNAL,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
267267
@Override
268268
public void prepareReadWriteTransaction() {
269269
setActive(null);
270-
readyTransactionId = beginTransaction();
270+
readyTransactionId = beginTransaction(true);
271271
}
272272

273273
@Override
@@ -288,17 +288,17 @@ public void close() {
288288
}
289289
}
290290

291-
ByteString beginTransaction() {
291+
ByteString beginTransaction(boolean routeToLeader) {
292292
try {
293-
return beginTransactionAsync().get();
293+
return beginTransactionAsync(routeToLeader).get();
294294
} catch (ExecutionException e) {
295295
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
296296
} catch (InterruptedException e) {
297297
throw SpannerExceptionFactory.propagateInterrupt(e);
298298
}
299299
}
300300

301-
ApiFuture<ByteString> beginTransactionAsync() {
301+
ApiFuture<ByteString> beginTransactionAsync(boolean routeToLeader) {
302302
final SettableApiFuture<ByteString> res = SettableApiFuture.create();
303303
final Span span = tracer.spanBuilder(SpannerImpl.BEGIN_TRANSACTION).startSpan();
304304
final BeginTransactionRequest request =
@@ -309,7 +309,7 @@ ApiFuture<ByteString> beginTransactionAsync() {
309309
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()))
310310
.build();
311311
final ApiFuture<Transaction> requestFuture =
312-
spanner.getRpc().beginTransactionAsync(request, options);
312+
spanner.getRpc().beginTransactionAsync(request, options, routeToLeader);
313313
requestFuture.addListener(
314314
tracer.withSpan(
315315
span,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ private TransactionContextImpl(Builder builder) {
196196
this.trackTransactionStarter = builder.trackTransactionStarter;
197197
this.options = builder.options;
198198
this.finishedAsyncOperations.set(null);
199+
this.routeToLeader = true;
199200
}
200201

201202
private void increaseAsyncOperations() {
@@ -255,7 +256,7 @@ ApiFuture<Void> ensureTxnAsync() {
255256

256257
private void createTxnAsync(final SettableApiFuture<Void> res) {
257258
span.addAnnotation("Creating Transaction");
258-
final ApiFuture<ByteString> fut = session.beginTransactionAsync();
259+
final ApiFuture<ByteString> fut = session.beginTransactionAsync(routeToLeader);
259260
fut.addListener(
260261
() -> {
261262
try {
@@ -719,7 +720,7 @@ private ResultSet internalExecuteUpdate(
719720
/* withTransactionSelector = */ true);
720721
try {
721722
com.google.spanner.v1.ResultSet resultSet =
722-
rpc.executeQuery(builder.build(), session.getOptions());
723+
rpc.executeQuery(builder.build(), session.getOptions(), routeToLeader);
723724
if (resultSet.getMetadata().hasTransaction()) {
724725
onTransactionMetadata(
725726
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
@@ -749,7 +750,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
749750
// Register the update as an async operation that must finish before the transaction may
750751
// commit.
751752
increaseAsyncOperations();
752-
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions());
753+
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), routeToLeader);
753754
} catch (Throwable t) {
754755
decreaseAsyncOperations();
755756
throw t;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1592,7 +1592,8 @@ public List<Session> batchCreateSessions(
15921592
requestBuilder.setSessionTemplate(sessionBuilder);
15931593
BatchCreateSessionsRequest request = requestBuilder.build();
15941594
GrpcCallContext context =
1595-
newCallContext(options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod());
1595+
newCallContext(
1596+
options, databaseName, request, SpannerGrpc.getBatchCreateSessionsMethod(), true);
15961597
return get(spannerStub.batchCreateSessionsCallable().futureCall(request, context))
15971598
.getSessionList();
15981599
}
@@ -1616,7 +1617,7 @@ public Session createSession(
16161617
requestBuilder.setSession(sessionBuilder);
16171618
CreateSessionRequest request = requestBuilder.build();
16181619
GrpcCallContext context =
1619-
newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod());
1620+
newCallContext(options, databaseName, request, SpannerGrpc.getCreateSessionMethod(), true);
16201621
return get(spannerStub.createSessionCallable().futureCall(request, context));
16211622
}
16221623

@@ -1630,15 +1631,19 @@ public void deleteSession(String sessionName, @Nullable Map<Option, ?> options)
16301631
public ApiFuture<Empty> asyncDeleteSession(String sessionName, @Nullable Map<Option, ?> options) {
16311632
DeleteSessionRequest request = DeleteSessionRequest.newBuilder().setName(sessionName).build();
16321633
GrpcCallContext context =
1633-
newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod());
1634+
newCallContext(options, sessionName, request, SpannerGrpc.getDeleteSessionMethod(), false);
16341635
return spannerStub.deleteSessionCallable().futureCall(request, context);
16351636
}
16361637

16371638
@Override
16381639
public StreamingCall read(
1639-
ReadRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
1640+
ReadRequest request,
1641+
ResultStreamConsumer consumer,
1642+
@Nullable Map<Option, ?> options,
1643+
boolean routeToLeader) {
16401644
GrpcCallContext context =
1641-
newCallContext(options, request.getSession(), request, SpannerGrpc.getReadMethod());
1645+
newCallContext(
1646+
options, request.getSession(), request, SpannerGrpc.getReadMethod(), routeToLeader);
16421647
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
16431648
spannerStub.streamingReadCallable().call(request, responseObserver, context);
16441649
final StreamController controller = responseObserver.getController();
@@ -1658,13 +1663,14 @@ public void cancel(String message) {
16581663
}
16591664

16601665
@Override
1661-
public ResultSet executeQuery(ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
1662-
return get(executeQueryAsync(request, options));
1666+
public ResultSet executeQuery(
1667+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
1668+
return get(executeQueryAsync(request, options, routeToLeader));
16631669
}
16641670

16651671
@Override
16661672
public ApiFuture<ResultSet> executeQueryAsync(
1667-
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
1673+
ExecuteSqlRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
16681674
GrpcCallContext context =
16691675
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
16701676
return spannerStub.executeSqlCallable().futureCall(request, context);
@@ -1674,7 +1680,8 @@ public ApiFuture<ResultSet> executeQueryAsync(
16741680
public ResultSet executePartitionedDml(
16751681
ExecuteSqlRequest request, @Nullable Map<Option, ?> options) {
16761682
GrpcCallContext context =
1677-
newCallContext(options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod());
1683+
newCallContext(
1684+
options, request.getSession(), request, SpannerGrpc.getExecuteSqlMethod(), true);
16781685
return get(partitionedDmlStub.executeSqlCallable().futureCall(request, context));
16791686
}
16801687

@@ -1688,18 +1695,29 @@ public ServerStream<PartialResultSet> executeStreamingPartitionedDml(
16881695
ExecuteSqlRequest request, Map<Option, ?> options, Duration timeout) {
16891696
GrpcCallContext context =
16901697
newCallContext(
1691-
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
1698+
options,
1699+
request.getSession(),
1700+
request,
1701+
SpannerGrpc.getExecuteStreamingSqlMethod(),
1702+
true);
16921703
// Override any timeout settings that might have been set on the call context.
16931704
context = context.withTimeout(timeout).withStreamWaitTimeout(timeout);
16941705
return partitionedDmlStub.executeStreamingSqlCallable().call(request, context);
16951706
}
16961707

16971708
@Override
16981709
public StreamingCall executeQuery(
1699-
ExecuteSqlRequest request, ResultStreamConsumer consumer, @Nullable Map<Option, ?> options) {
1710+
ExecuteSqlRequest request,
1711+
ResultStreamConsumer consumer,
1712+
@Nullable Map<Option, ?> options,
1713+
boolean routeToLeader) {
17001714
GrpcCallContext context =
17011715
newCallContext(
1702-
options, request.getSession(), request, SpannerGrpc.getExecuteStreamingSqlMethod());
1716+
options,
1717+
request.getSession(),
1718+
request,
1719+
SpannerGrpc.getExecuteStreamingSqlMethod(),
1720+
routeToLeader);
17031721
SpannerResponseObserver responseObserver = new SpannerResponseObserver(consumer);
17041722
spannerStub.executeStreamingSqlCallable().call(request, responseObserver, context);
17051723
final StreamController controller = responseObserver.getController();
@@ -1729,30 +1747,35 @@ public ApiFuture<ExecuteBatchDmlResponse> executeBatchDmlAsync(
17291747
ExecuteBatchDmlRequest request, @Nullable Map<Option, ?> options) {
17301748
GrpcCallContext context =
17311749
newCallContext(
1732-
options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod());
1750+
options, request.getSession(), request, SpannerGrpc.getExecuteBatchDmlMethod(), true);
17331751
return spannerStub.executeBatchDmlCallable().futureCall(request, context);
17341752
}
17351753

17361754
@Override
17371755
public ApiFuture<Transaction> beginTransactionAsync(
1738-
BeginTransactionRequest request, @Nullable Map<Option, ?> options) {
1756+
BeginTransactionRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader) {
17391757
GrpcCallContext context =
17401758
newCallContext(
1741-
options, request.getSession(), request, SpannerGrpc.getBeginTransactionMethod());
1759+
options,
1760+
request.getSession(),
1761+
request,
1762+
SpannerGrpc.getBeginTransactionMethod(),
1763+
routeToLeader);
17421764
return spannerStub.beginTransactionCallable().futureCall(request, context);
17431765
}
17441766

17451767
@Override
17461768
public Transaction beginTransaction(
1747-
BeginTransactionRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
1748-
return get(beginTransactionAsync(request, options));
1769+
BeginTransactionRequest request, @Nullable Map<Option, ?> options, boolean routeToLeader)
1770+
throws SpannerException {
1771+
return get(beginTransactionAsync(request, options, routeToLeader));
17491772
}
17501773

17511774
@Override
17521775
public ApiFuture<CommitResponse> commitAsync(
17531776
CommitRequest request, @Nullable Map<Option, ?> options) {
17541777
GrpcCallContext context =
1755-
newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod());
1778+
newCallContext(options, request.getSession(), request, SpannerGrpc.getCommitMethod(), true);
17561779
return spannerStub.commitCallable().futureCall(request, context);
17571780
}
17581781

@@ -1765,7 +1788,8 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option,
17651788
@Override
17661789
public ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Option, ?> options) {
17671790
GrpcCallContext context =
1768-
newCallContext(options, request.getSession(), request, SpannerGrpc.getRollbackMethod());
1791+
newCallContext(
1792+
options, request.getSession(), request, SpannerGrpc.getRollbackMethod(), true);
17691793
return spannerStub.rollbackCallable().futureCall(request, context);
17701794
}
17711795

@@ -1780,7 +1804,7 @@ public PartitionResponse partitionQuery(
17801804
PartitionQueryRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
17811805
GrpcCallContext context =
17821806
newCallContext(
1783-
options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod());
1807+
options, request.getSession(), request, SpannerGrpc.getPartitionQueryMethod(), true);
17841808
return get(spannerStub.partitionQueryCallable().futureCall(request, context));
17851809
}
17861810

@@ -1789,7 +1813,7 @@ public PartitionResponse partitionRead(
17891813
PartitionReadRequest request, @Nullable Map<Option, ?> options) throws SpannerException {
17901814
GrpcCallContext context =
17911815
newCallContext(
1792-
options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod());
1816+
options, request.getSession(), request, SpannerGrpc.getPartitionReadMethod(), true);
17931817
return get(spannerStub.partitionReadCallable().futureCall(request, context));
17941818
}
17951819

@@ -1898,6 +1922,16 @@ <ReqT, RespT> GrpcCallContext newCallContext(
18981922
String resource,
18991923
ReqT request,
19001924
MethodDescriptor<ReqT, RespT> method) {
1925+
return newCallContext(options, resource, request, method, false);
1926+
}
1927+
1928+
@VisibleForTesting
1929+
<ReqT, RespT> GrpcCallContext newCallContext(
1930+
@Nullable Map<Option, ?> options,
1931+
String resource,
1932+
ReqT request,
1933+
MethodDescriptor<ReqT, RespT> method,
1934+
boolean routeToLeader) {
19011935
GrpcCallContext context = GrpcCallContext.createDefault();
19021936
if (options != null) {
19031937
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
@@ -1907,6 +1941,9 @@ <ReqT, RespT> GrpcCallContext newCallContext(
19071941
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
19081942
}
19091943
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
1944+
if (routeToLeader) {
1945+
context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader());
1946+
}
19101947
if (callCredentialsProvider != null) {
19111948
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
19121949
if (callCredentials != null) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerMetadataProvider.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
class SpannerMetadataProvider {
2929
private final Map<Metadata.Key<String>, String> headers;
3030
private final String resourceHeaderKey;
31-
31+
private static final String routeToLeaderHeaderKey = "x-goog-spanner-route-to-leader";
3232
private static final Pattern[] RESOURCE_TOKEN_PATTERNS = {
3333
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*/databases/[^/]*)(.*)?"),
3434
Pattern.compile("^(?<headerValue>projects/[^/]*/instances/[^/]*)(.*)?")
@@ -66,6 +66,12 @@ Map<String, List<String>> newExtraHeaders(
6666
.build();
6767
}
6868

69+
Map<String, List<String>> newRouteToLeaderHeader() {
70+
return ImmutableMap.<String, List<String>>builder()
71+
.put(routeToLeaderHeaderKey, Collections.singletonList("true"))
72+
.build();
73+
}
74+
6975
private Map<Metadata.Key<String>, String> constructHeadersAsMetadata(
7076
Map<String, String> headers) {
7177
ImmutableMap.Builder<Metadata.Key<String>, String> headersAsMetadataBuilder =

0 commit comments

Comments
 (0)