Skip to content

Commit d21cc73

Browse files
committed
chore(x-goog-spanner-request-id): add BeginTransaction+ResumableStreamIterator
Plumbs x-goog-spanner-request-id into BeginTransaction and ResumableStreamIterator and for PartitionedDmlTransaction. Updates #3537
1 parent db0ed07 commit d21cc73

File tree

8 files changed

+91
-36
lines changed

8 files changed

+91
-36
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -457,9 +457,12 @@ void initTransaction() {
457457
}
458458

459459
private void initTransactionInternal(BeginTransactionRequest request) {
460+
XGoogSpannerRequestId reqId =
461+
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
460462
try {
461463
Transaction transaction =
462-
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
464+
rpc.beginTransaction(
465+
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
463466
if (!transaction.hasReadTimestamp()) {
464467
throw SpannerExceptionFactory.newSpannerException(
465468
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
@@ -803,7 +806,8 @@ ResultSet executeQueryInternalWithOptions(
803806
tracer.createStatementAttributes(statement, options),
804807
session.getErrorHandler(),
805808
rpc.getExecuteQueryRetrySettings(),
806-
rpc.getExecuteQueryRetryableCodes()) {
809+
rpc.getExecuteQueryRetryableCodes(),
810+
session.getRequestIdCreator()) {
807811
@Override
808812
CloseableIterator<PartialResultSet> startStream(
809813
@Nullable ByteString resumeToken,
@@ -826,11 +830,12 @@ CloseableIterator<PartialResultSet> startStream(
826830
if (selector != null) {
827831
request.setTransaction(selector);
828832
}
833+
this.incrementXGoogRequestIdAttempt();
829834
SpannerRpc.StreamingCall call =
830835
rpc.executeQuery(
831836
request.build(),
832837
stream.consumer(),
833-
getTransactionChannelHint(),
838+
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
834839
isRouteToLeader());
835840
session.markUsed(clock.instant());
836841
stream.setCall(call, request.getTransaction().hasBegin());
@@ -1008,7 +1013,8 @@ ResultSet readInternalWithOptions(
10081013
tracer.createTableAttributes(table, readOptions),
10091014
session.getErrorHandler(),
10101015
rpc.getReadRetrySettings(),
1011-
rpc.getReadRetryableCodes()) {
1016+
rpc.getReadRetryableCodes(),
1017+
session.getRequestIdCreator()) {
10121018
@Override
10131019
CloseableIterator<PartialResultSet> startStream(
10141020
@Nullable ByteString resumeToken,
@@ -1029,11 +1035,12 @@ CloseableIterator<PartialResultSet> startStream(
10291035
builder.setTransaction(selector);
10301036
}
10311037
builder.setRequestOptions(buildRequestOptions(readOptions));
1038+
this.incrementXGoogRequestIdAttempt();
10321039
SpannerRpc.StreamingCall call =
10331040
rpc.read(
10341041
builder.build(),
10351042
stream.consumer(),
1036-
getTransactionChannelHint(),
1043+
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
10371044
isRouteToLeader());
10381045
session.markUsed(clock.instant());
10391046
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,16 +80,22 @@ long executeStreamingPartitionedUpdate(
8080
long updateCount = 0L;
8181
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
8282
Options options = Options.fromUpdateOptions(updateOptions);
83+
XGoogSpannerRequestId reqId = options.reqId();
84+
if (reqId == null) {
85+
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
86+
}
8387

8488
try {
8589
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
8690

8791
while (true) {
92+
reqId.incrementAttempt();
8893
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
8994

9095
try {
9196
ServerStream<PartialResultSet> stream =
92-
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
97+
rpc.executeStreamingPartitionedDml(
98+
request, reqId.withOptions(session.getOptions()), remainingTimeout);
9399

94100
for (PartialResultSet rs : stream) {
95101
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
@@ -119,12 +125,15 @@ long executeStreamingPartitionedUpdate(
119125
foundStats = false;
120126
updateCount = 0L;
121127
request = newTransactionRequestFrom(statement, options);
128+
// Create a new xGoogSpannerRequestId.
129+
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
122130
}
123131
}
124132
if (!foundStats) {
125133
throw SpannerExceptionFactory.newSpannerException(
126134
ErrorCode.INVALID_ARGUMENT,
127-
"Partitioned DML response missing stats possibly due to non-DML statement as input");
135+
"Partitioned DML response missing stats possibly due to non-DML statement as input",
136+
reqId);
128137
}
129138
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
130139
return updateCount;
@@ -209,7 +218,11 @@ private ByteString initTransaction(final Options options) {
209218
.setExcludeTxnFromChangeStreams(
210219
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
211220
.build();
212-
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
221+
XGoogSpannerRequestId reqId = options.reqId();
222+
if (reqId == null) {
223+
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 1);
224+
}
225+
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
213226
if (tx.getId().isEmpty()) {
214227
throw SpannerExceptionFactory.newSpannerException(
215228
ErrorCode.INTERNAL,

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
7171
private CloseableIterator<PartialResultSet> stream;
7272
private ByteString resumeToken;
7373
private boolean finished;
74+
public XGoogSpannerRequestId xGoogRequestId;
75+
private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator;
7476

7577
/**
7678
* Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
@@ -86,7 +88,8 @@ protected ResumableStreamIterator(
8688
TraceWrapper tracer,
8789
ErrorHandler errorHandler,
8890
RetrySettings streamingRetrySettings,
89-
Set<Code> retryableCodes) {
91+
Set<Code> retryableCodes,
92+
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
9093
this(
9194
maxBufferSize,
9295
streamName,
@@ -95,7 +98,8 @@ protected ResumableStreamIterator(
9598
Attributes.empty(),
9699
errorHandler,
97100
streamingRetrySettings,
98-
retryableCodes);
101+
retryableCodes,
102+
xGoogRequestIdCreator);
99103
}
100104

101105
protected ResumableStreamIterator(
@@ -106,14 +110,16 @@ protected ResumableStreamIterator(
106110
Attributes attributes,
107111
ErrorHandler errorHandler,
108112
RetrySettings streamingRetrySettings,
109-
Set<Code> retryableCodes) {
113+
Set<Code> retryableCodes,
114+
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
110115
checkArgument(maxBufferSize >= 0);
111116
this.maxBufferSize = maxBufferSize;
112117
this.tracer = tracer;
113118
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
114119
this.errorHandler = errorHandler;
115120
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
116121
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
122+
this.xGoogRequestIdCreator = xGoogRequestIdCreator;
117123
}
118124

119125
private ExponentialBackOff newBackOff() {
@@ -190,6 +196,14 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
190196
}
191197
}
192198

199+
public void incrementXGoogRequestIdAttempt() {
200+
if (this.xGoogRequestId == null) {
201+
this.xGoogRequestId =
202+
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 0 /*attempt*/);
203+
}
204+
this.xGoogRequestId.incrementAttempt();
205+
}
206+
193207
private enum DirectExecutor implements Executor {
194208
INSTANCE;
195209

@@ -281,6 +295,7 @@ protected PartialResultSet computeNext() {
281295
}
282296
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
283297
stream = null;
298+
incrementXGoogRequestIdAttempt();
284299
try (IScope s = tracer.withSpan(span)) {
285300
long delay = spannerException.getRetryDelayInMillis();
286301
if (delay != -1) {
@@ -302,6 +317,7 @@ protected PartialResultSet computeNext() {
302317
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
303318
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
304319
stream = null;
320+
xGoogRequestId = null;
305321
continue;
306322
}
307323
}
@@ -328,6 +344,10 @@ private void startGrpcStreaming() {
328344
// this Span.
329345
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
330346
stream.requestPrefetchChunks();
347+
if (this.xGoogRequestId == null) {
348+
this.xGoogRequestId =
349+
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: retrieve channelId*/, 0);
350+
}
331351
}
332352
}
333353
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -300,11 +300,12 @@ public CommitResponse writeAtLeastOnceWithOptions(
300300
}
301301
CommitRequest request = requestBuilder.build();
302302
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
303-
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
304303

305304
try (IScope s = tracer.withSpan(span)) {
306305
return SpannerRetryHelper.runTxWithRetriesOnAborted(
307306
() -> {
307+
// On Aborted, we have to start a fresh request id.
308+
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
308309
return new CommitResponse(
309310
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
310311
});
@@ -516,7 +517,7 @@ ApiFuture<Transaction> beginTransactionAsync(
516517
Transaction txn = requestFuture.get();
517518
if (txn.getId().isEmpty()) {
518519
throw newSpannerException(
519-
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
520+
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName(), reqId);
520521
}
521522
span.end();
522523
res.set(txn);
@@ -525,7 +526,7 @@ ApiFuture<Transaction> beginTransactionAsync(
525526
span.end();
526527
res.setException(
527528
SpannerExceptionFactory.newSpannerException(
528-
e.getCause() == null ? e : e.getCause()));
529+
e.getCause() == null ? e : e.getCause(), reqId));
529530
} catch (InterruptedException e) {
530531
span.setStatus(e);
531532
span.end();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,8 @@ private final class CommitRunnable implements Runnable {
449449

450450
@Override
451451
public void run() {
452+
XGoogSpannerRequestId reqId =
453+
session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1);
452454
try {
453455
prev.get();
454456
if (transactionId == null && transactionIdFuture == null) {
@@ -491,7 +493,8 @@ public void run() {
491493
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture;
492494
final ISpan opSpan = tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span);
493495
try (IScope ignore = tracer.withSpan(opSpan)) {
494-
commitFuture = rpc.commitAsync(commitRequest, getTransactionChannelHint());
496+
commitFuture =
497+
rpc.commitAsync(commitRequest, reqId.withOptions(getTransactionChannelHint()));
495498
}
496499
session.markUsed(clock.instant());
497500
commitFuture.addListener(
@@ -502,7 +505,7 @@ public void run() {
502505
// future, but we add a result here as well as a safety precaution.
503506
res.setException(
504507
SpannerExceptionFactory.newSpannerException(
505-
ErrorCode.INTERNAL, "commitFuture is not done"));
508+
ErrorCode.INTERNAL, "commitFuture is not done", reqId));
506509
return;
507510
}
508511
com.google.spanner.v1.CommitResponse proto = commitFuture.get();
@@ -532,7 +535,9 @@ public void run() {
532535
}
533536
if (!proto.hasCommitTimestamp()) {
534537
throw newSpannerException(
535-
ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName());
538+
ErrorCode.INTERNAL,
539+
"Missing commitTimestamp:\n" + session.getName(),
540+
reqId);
536541
}
537542
span.addAnnotation("Commit Done");
538543
opSpan.end();
@@ -568,7 +573,8 @@ public void run() {
568573
res.setException(SpannerExceptionFactory.propagateTimeout(e));
569574
} catch (Throwable e) {
570575
res.setException(
571-
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
576+
SpannerExceptionFactory.newSpannerException(
577+
e.getCause() == null ? e : e.getCause(), reqId));
572578
}
573579
}
574580
}
@@ -1056,9 +1062,11 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
10561062
}
10571063
final ExecuteBatchDmlRequest.Builder builder =
10581064
getExecuteBatchDmlRequestBuilder(statements, options);
1065+
XGoogSpannerRequestId reqId =
1066+
session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1);
10591067
try {
10601068
com.google.spanner.v1.ExecuteBatchDmlResponse response =
1061-
rpc.executeBatchDml(builder.build(), getTransactionChannelHint());
1069+
rpc.executeBatchDml(builder.build(), reqId.withOptions(getTransactionChannelHint()));
10621070
session.markUsed(clock.instant());
10631071
long[] results = new long[response.getResultSetsCount()];
10641072
for (int i = 0; i < response.getResultSetsCount(); ++i) {
@@ -1083,7 +1091,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... update
10831091
ErrorCode.fromRpcStatus(response.getStatus()),
10841092
response.getStatus().getMessage(),
10851093
results,
1086-
null /*TODO: requestId*/);
1094+
reqId);
10871095
}
10881096
return results;
10891097
} catch (Throwable e) {
@@ -1116,11 +1124,15 @@ public ApiFuture<long[]> batchUpdateAsync(
11161124
final ExecuteBatchDmlRequest.Builder builder =
11171125
getExecuteBatchDmlRequestBuilder(statements, options);
11181126
ApiFuture<com.google.spanner.v1.ExecuteBatchDmlResponse> response;
1127+
XGoogSpannerRequestId reqId =
1128+
session.getRequestIdCreator().nextRequestId(1 /*TODO: channelId */, 1);
11191129
try {
11201130
// Register the update as an async operation that must finish before the transaction may
11211131
// commit.
11221132
increaseAsyncOperations();
1123-
response = rpc.executeBatchDmlAsync(builder.build(), getTransactionChannelHint());
1133+
response =
1134+
rpc.executeBatchDmlAsync(
1135+
builder.build(), reqId.withOptions(getTransactionChannelHint()));
11241136
session.markUsed(clock.instant());
11251137
} catch (Throwable t) {
11261138
decreaseAsyncOperations();
@@ -1151,7 +1163,7 @@ public ApiFuture<long[]> batchUpdateAsync(
11511163
ErrorCode.fromRpcStatus(batchDmlResponse.getStatus()),
11521164
batchDmlResponse.getStatus().getMessage(),
11531165
results,
1154-
null /*TODO: requestId*/);
1166+
reqId);
11551167
}
11561168
return results;
11571169
},

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -228,20 +228,19 @@ public static void startStaticServer() throws IOException {
228228
Set<String> checkMethods =
229229
new HashSet(
230230
Arrays.asList(
231-
"google.spanner.v1.Spanner/BatchCreateSessions"
231+
"google.spanner.v1.Spanner/BatchCreateSessions",
232232
// As functionality is added, uncomment each method.
233-
// "google.spanner.v1.Spanner/BatchWrite",
234-
// "google.spanner.v1.Spanner/BeginTransaction",
235-
// "google.spanner.v1.Spanner/CreateSession",
236-
// "google.spanner.v1.Spanner/DeleteSession",
237-
// "google.spanner.v1.Spanner/ExecuteBatchDml",
238-
// "google.spanner.v1.Spanner/ExecuteSql",
239-
// "google.spanner.v1.Spanner/ExecuteStreamingSql",
240-
// "google.spanner.v1.Spanner/StreamingRead",
241-
// "google.spanner.v1.Spanner/PartitionQuery",
242-
// "google.spanner.v1.Spanner/PartitionRead",
243-
// "google.spanner.v1.Spanner/Commit",
244-
));
233+
"google.spanner.v1.Spanner/BatchWrite",
234+
"google.spanner.v1.Spanner/BeginTransaction",
235+
"google.spanner.v1.Spanner/CreateSession",
236+
"google.spanner.v1.Spanner/DeleteSession",
237+
"google.spanner.v1.Spanner/ExecuteBatchDml",
238+
"google.spanner.v1.Spanner/ExecuteSql",
239+
"google.spanner.v1.Spanner/ExecuteStreamingSql",
240+
"google.spanner.v1.Spanner/StreamingRead",
241+
"google.spanner.v1.Spanner/PartitionQuery",
242+
"google.spanner.v1.Spanner/PartitionRead",
243+
"google.spanner.v1.Spanner/Commit"));
245244
xGoogReqIdInterceptor = new XGoogSpannerRequestIdTest.ServerHeaderEnforcer(checkMethods);
246245
executor = Executors.newSingleThreadExecutor();
247246
String uniqueName = InProcessServerBuilder.generateName();

google-cloud-spanner/src/test/java/com/google/cloud/spanner/PartitionedDmlTransactionTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ public class PartitionedDmlTransactionTest {
9696
public void setup() {
9797
MockitoAnnotations.initMocks(this);
9898
when(session.getName()).thenReturn(sessionId);
99+
when(session.getRequestIdCreator())
100+
.thenReturn(new XGoogSpannerRequestId.NoopRequestIdCreator());
99101
when(session.getOptions()).thenReturn(Collections.EMPTY_MAP);
100102
when(rpc.beginTransaction(any(BeginTransactionRequest.class), anyMap(), eq(true)))
101103
.thenReturn(Transaction.newBuilder().setId(txId).build());

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ private void initWithLimit(int maxBufferSize) {
162162
new TraceWrapper(Tracing.getTracer(), OpenTelemetry.noop().getTracer(""), false),
163163
DefaultErrorHandler.INSTANCE,
164164
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings(),
165-
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) {
165+
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes(),
166+
new XGoogSpannerRequestId.NoopRequestIdCreator()) {
166167
@Override
167168
AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
168169
@Nullable ByteString resumeToken,

0 commit comments

Comments
 (0)