Skip to content

Commit 7449771

Browse files
committed
Merge from main
1 parent 5cc6b71 commit 7449771

File tree

5 files changed

+40
-22
lines changed

5 files changed

+40
-22
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -465,17 +465,22 @@ private void initTransactionInternal(BeginTransactionRequest request) {
465465
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
466466
if (!transaction.hasReadTimestamp()) {
467467
throw SpannerExceptionFactory.newSpannerException(
468-
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
468+
ErrorCode.INTERNAL,
469+
"Missing expected transaction.read_timestamp metadata field",
470+
reqId);
469471
}
470472
if (transaction.getId().isEmpty()) {
471473
throw SpannerExceptionFactory.newSpannerException(
472-
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
474+
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field", reqId);
473475
}
474476
try {
475477
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
476478
} catch (IllegalArgumentException e) {
477479
throw SpannerExceptionFactory.newSpannerException(
478-
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
480+
ErrorCode.INTERNAL,
481+
"Bad value in transaction.read_timestamp metadata field",
482+
e,
483+
reqId);
479484
}
480485
transactionId = transaction.getId();
481486
span.addAnnotation(
@@ -830,7 +835,7 @@ CloseableIterator<PartialResultSet> startStream(
830835
if (selector != null) {
831836
request.setTransaction(selector);
832837
}
833-
this.incrementXGoogRequestIdAttempt();
838+
this.ensureNonNullXGoogRequestId();
834839
SpannerRpc.StreamingCall call =
835840
rpc.executeQuery(
836841
request.build(),
@@ -1036,6 +1041,7 @@ CloseableIterator<PartialResultSet> startStream(
10361041
}
10371042
builder.setRequestOptions(buildRequestOptions(readOptions));
10381043
this.incrementXGoogRequestIdAttempt();
1044+
this.xGoogRequestId.setChannelId(session.getChannel());
10391045
SpannerRpc.StreamingCall call =
10401046
rpc.read(
10411047
builder.build(),

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,17 @@ long executeStreamingPartitionedUpdate(
7979
boolean foundStats = false;
8080
long updateCount = 0L;
8181
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
82-
Options options = Options.fromUpdateOptions(updateOptions);
83-
XGoogSpannerRequestId reqId = options.reqId();
84-
if (reqId == null) {
85-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 0);
86-
}
82+
XGoogSpannerRequestId reqId =
83+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
84+
UpdateOption[] allOptions = new UpdateOption[updateOptions.length + 1];
85+
System.arraycopy(updateOptions, 0, allOptions, 0, updateOptions.length);
86+
allOptions[allOptions.length - 1] = new Options.RequestIdOption(reqId);
87+
Options options = Options.fromUpdateOptions(allOptions);
8788

8889
try {
8990
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
9091

9192
while (true) {
92-
reqId.incrementAttempt();
9393
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
9494

9595
try {
@@ -110,6 +110,7 @@ long executeStreamingPartitionedUpdate(
110110
} catch (UnavailableException e) {
111111
LOGGER.log(
112112
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
113+
reqId.incrementAttempt();
113114
request = resumeOrRestartRequest(resumeToken, statement, request, options);
114115
} catch (InternalException e) {
115116
if (!isRetryableInternalErrorPredicate.apply(e)) {
@@ -118,6 +119,7 @@ long executeStreamingPartitionedUpdate(
118119

119120
LOGGER.log(
120121
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
122+
reqId.incrementAttempt();
121123
request = resumeOrRestartRequest(resumeToken, statement, request, options);
122124
} catch (AbortedException e) {
123125
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
@@ -126,7 +128,7 @@ long executeStreamingPartitionedUpdate(
126128
updateCount = 0L;
127129
request = newTransactionRequestFrom(statement, options);
128130
// Create a new xGoogSpannerRequestId.
129-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 0);
131+
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
130132
} catch (SpannerException e) {
131133
e.setRequestId(reqId);
132134
throw e;
@@ -141,7 +143,7 @@ long executeStreamingPartitionedUpdate(
141143
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
142144
return updateCount;
143145
} catch (Exception e) {
144-
throw SpannerExceptionFactory.newSpannerException(e);
146+
throw SpannerExceptionFactory.newSpannerException(e, reqId);
145147
}
146148
}
147149

@@ -221,10 +223,8 @@ private ByteString initTransaction(final Options options) {
221223
.setExcludeTxnFromChangeStreams(
222224
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
223225
.build();
224-
XGoogSpannerRequestId reqId = options.reqId();
225-
if (reqId == null) {
226-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
227-
}
226+
XGoogSpannerRequestId reqId =
227+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
228228
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
229229
if (tx.getId().isEmpty()) {
230230
throw SpannerExceptionFactory.newSpannerException(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,15 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
196196
}
197197
}
198198

199-
public void incrementXGoogRequestIdAttempt() {
199+
public void ensureNonNullXGoogRequestId() {
200200
if (this.xGoogRequestId == null) {
201201
this.xGoogRequestId =
202-
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 0 /*attempt*/);
202+
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 1 /*attempt*/);
203203
}
204+
}
205+
206+
public void incrementXGoogRequestIdAttempt() {
207+
this.ensureNonNullXGoogRequestId();
204208
this.xGoogRequestId.incrementAttempt();
205209
}
206210

@@ -347,7 +351,8 @@ private void startGrpcStreaming() {
347351
stream.requestPrefetchChunks();
348352
if (this.xGoogRequestId == null) {
349353
this.xGoogRequestId =
350-
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: retrieve channelId*/, 0);
354+
this.xGoogRequestIdCreator.nextRequestId(
355+
1 /* channelId shall be replaced by the instantiated class. */, 0);
351356
}
352357
}
353358
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ DatabaseId getDatabaseId() {
223223
@Override
224224
public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
225225
return XGoogSpannerRequestId.of(
226-
this.nthId, channelId, this.nthRequest.incrementAndGet(), attempt);
226+
this.nthId, this.nthRequest.incrementAndGet(), channelId, attempt);
227227
}
228228

229229
/** Create a single session. */
@@ -423,7 +423,7 @@ private List<SessionImpl> internalBatchCreateSessions(
423423
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
424424
try (IScope s = spanner.getTracer().withSpan(span)) {
425425
XGoogSpannerRequestId reqId =
426-
XGoogSpannerRequestId.of(this.nthId, channelHint, this.nthRequest.incrementAndGet(), 1);
426+
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
427427
List<com.google.spanner.v1.Session> sessions =
428428
spanner
429429
.getRpc()

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -600,7 +600,14 @@ int getChannel() {
600600
if (getIsMultiplexed()) {
601601
return 0;
602602
}
603-
Long channelHint = (Long) this.getOptions().get(SpannerRpc.Option.CHANNEL_HINT);
603+
Map<SpannerRpc.Option, ?> options = this.getOptions();
604+
if (options == null) {
605+
return 0;
606+
}
607+
Long channelHint = (Long) options.get(SpannerRpc.Option.CHANNEL_HINT);
608+
if (channelHint == null) {
609+
return 0;
610+
}
604611
return (int) (channelHint % this.spanner.getOptions().getNumChannels());
605612
}
606613
}

0 commit comments

Comments
 (0)