Skip to content

chore(x-goog-spanner-request-id): propagate reqId into exceptions plus prior code review suggestions #3922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -457,22 +457,30 @@ void initTransaction() {
}

private void initTransactionInternal(BeginTransactionRequest request) {
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
try {
Transaction transaction =
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
rpc.beginTransaction(
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
ErrorCode.INTERNAL,
"Missing expected transaction.read_timestamp metadata field",
reqId);
}
if (transaction.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field", reqId);
}
try {
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
} catch (IllegalArgumentException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
ErrorCode.INTERNAL,
"Bad value in transaction.read_timestamp metadata field",
e,
reqId);
}
transactionId = transaction.getId();
span.addAnnotation(
Expand Down Expand Up @@ -803,7 +811,8 @@ ResultSet executeQueryInternalWithOptions(
tracer.createStatementAttributes(statement, options),
session.getErrorHandler(),
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
rpc.getExecuteQueryRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -826,11 +835,12 @@ CloseableIterator<PartialResultSet> startStream(
if (selector != null) {
request.setTransaction(selector);
}
this.ensureNonNullXGoogRequestId();
SpannerRpc.StreamingCall call =
rpc.executeQuery(
request.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, request.getTransaction().hasBegin());
Expand Down Expand Up @@ -1008,7 +1018,8 @@ ResultSet readInternalWithOptions(
tracer.createTableAttributes(table, readOptions),
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
rpc.getReadRetryableCodes(),
session.getRequestIdCreator()) {
@Override
CloseableIterator<PartialResultSet> startStream(
@Nullable ByteString resumeToken,
Expand All @@ -1029,11 +1040,13 @@ CloseableIterator<PartialResultSet> startStream(
builder.setTransaction(selector);
}
builder.setRequestOptions(buildRequestOptions(readOptions));
this.incrementXGoogRequestIdAttempt();
this.xGoogRequestId.setChannelId(session.getChannel());
SpannerRpc.StreamingCall call =
rpc.read(
builder.build(),
stream.consumer(),
getTransactionChannelHint(),
this.xGoogRequestId.withOptions(getTransactionChannelHint()),
isRouteToLeader());
session.markUsed(clock.instant());
stream.setCall(call, /* withBeginTransaction= */ builder.getTransaction().hasBegin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,11 @@ private List<Partition> partitionReadUsingIndex(
}
builder.setPartitionOptions(pbuilder.build());

XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
final PartitionReadRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionRead(request, options);
PartitionResponse response = rpc.partitionRead(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand All @@ -272,6 +274,7 @@ private List<Partition> partitionReadUsingIndex(
return partitionReadUsingIndex(
partitionOptions, table, index, keys, columns, true, option);
}
e.setRequestId(reqId);
throw e;
}
}
Expand Down Expand Up @@ -313,9 +316,11 @@ private List<Partition> partitionQuery(
}
builder.setPartitionOptions(pbuilder.build());

XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
final PartitionQueryRequest request = builder.build();
try {
PartitionResponse response = rpc.partitionQuery(request, options);
PartitionResponse response = rpc.partitionQuery(request, reqId.withOptions(options));
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Expand All @@ -328,6 +333,7 @@ private List<Partition> partitionQuery(
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
return partitionQuery(partitionOptions, statement, true, option);
}
e.setRequestId(reqId);
throw e;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,12 @@ long executeStreamingPartitionedUpdate(
boolean foundStats = false;
long updateCount = 0L;
Stopwatch stopwatch = Stopwatch.createStarted(ticker);
Options options = Options.fromUpdateOptions(updateOptions);
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
UpdateOption[] allOptions = new UpdateOption[updateOptions.length + 1];
System.arraycopy(updateOptions, 0, allOptions, 0, updateOptions.length);
allOptions[allOptions.length - 1] = new Options.RequestIdOption(reqId);
Options options = Options.fromUpdateOptions(allOptions);

try {
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
Expand All @@ -89,7 +94,8 @@ long executeStreamingPartitionedUpdate(

try {
ServerStream<PartialResultSet> stream =
rpc.executeStreamingPartitionedDml(request, session.getOptions(), remainingTimeout);
rpc.executeStreamingPartitionedDml(
request, reqId.withOptions(session.getOptions()), remainingTimeout);

for (PartialResultSet rs : stream) {
if (rs.getResumeToken() != null && !rs.getResumeToken().isEmpty()) {
Expand All @@ -104,6 +110,7 @@ long executeStreamingPartitionedUpdate(
} catch (UnavailableException e) {
LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
reqId.incrementAttempt();
request = resumeOrRestartRequest(resumeToken, statement, request, options);
} catch (InternalException e) {
if (!isRetryableInternalErrorPredicate.apply(e)) {
Expand All @@ -112,24 +119,31 @@ long executeStreamingPartitionedUpdate(

LOGGER.log(
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
reqId.incrementAttempt();
request = resumeOrRestartRequest(resumeToken, statement, request, options);
} catch (AbortedException e) {
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
resumeToken = ByteString.EMPTY;
foundStats = false;
updateCount = 0L;
request = newTransactionRequestFrom(statement, options);
// Create a new xGoogSpannerRequestId.
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
} catch (SpannerException e) {
e.setRequestId(reqId);
throw e;
}
}
if (!foundStats) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INVALID_ARGUMENT,
"Partitioned DML response missing stats possibly due to non-DML statement as input");
"Partitioned DML response missing stats possibly due to non-DML statement as input",
reqId);
}
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
return updateCount;
} catch (Exception e) {
throw SpannerExceptionFactory.newSpannerException(e);
throw SpannerExceptionFactory.newSpannerException(e, reqId);
}
}

Expand Down Expand Up @@ -209,11 +223,14 @@ private ByteString initTransaction(final Options options) {
.setExcludeTxnFromChangeStreams(
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
.build();
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
XGoogSpannerRequestId reqId =
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
Transaction tx = rpc.beginTransaction(request, reqId.withOptions(session.getOptions()), true);
if (tx.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL,
"Failed to init transaction, missing transaction id\n" + session.getName());
"Failed to init transaction, missing transaction id\n" + session.getName(),
reqId);
}
return tx.getId();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
private CloseableIterator<PartialResultSet> stream;
private ByteString resumeToken;
private boolean finished;
public XGoogSpannerRequestId xGoogRequestId;
private XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator;

/**
* Indicates whether it is currently safe to retry RPCs. This will be {@code false} if we have
Expand All @@ -86,7 +88,8 @@ protected ResumableStreamIterator(
TraceWrapper tracer,
ErrorHandler errorHandler,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
Set<Code> retryableCodes,
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
this(
maxBufferSize,
streamName,
Expand All @@ -95,7 +98,8 @@ protected ResumableStreamIterator(
Attributes.empty(),
errorHandler,
streamingRetrySettings,
retryableCodes);
retryableCodes,
xGoogRequestIdCreator);
}

protected ResumableStreamIterator(
Expand All @@ -106,14 +110,16 @@ protected ResumableStreamIterator(
Attributes attributes,
ErrorHandler errorHandler,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
Set<Code> retryableCodes,
XGoogSpannerRequestId.RequestIdCreator xGoogRequestIdCreator) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.tracer = tracer;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
this.errorHandler = errorHandler;
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.xGoogRequestIdCreator = xGoogRequestIdCreator;
}

private ExponentialBackOff newBackOff() {
Expand Down Expand Up @@ -181,15 +187,27 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
}
if (latch.await(backoffMillis, TimeUnit.MILLISECONDS)) {
// Woken by context cancellation.
throw newSpannerExceptionForCancellation(context, null, null /*TODO: requestId*/);
throw newSpannerExceptionForCancellation(context, null, this.xGoogRequestId);
}
} catch (InterruptedException interruptExcept) {
throw newSpannerExceptionForCancellation(context, interruptExcept, null /*TODO: requestId*/);
throw newSpannerExceptionForCancellation(context, interruptExcept, this.xGoogRequestId);
} finally {
context.removeListener(listener);
}
}

public void ensureNonNullXGoogRequestId() {
if (this.xGoogRequestId == null) {
this.xGoogRequestId =
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 1 /*attempt*/);
}
}

public void incrementXGoogRequestIdAttempt() {
this.ensureNonNullXGoogRequestId();
this.xGoogRequestId.incrementAttempt();
}

private enum DirectExecutor implements Executor {
INSTANCE;

Expand Down Expand Up @@ -281,6 +299,7 @@ protected PartialResultSet computeNext() {
}
assert buffer.isEmpty() || buffer.getLast().getResumeToken().equals(resumeToken);
stream = null;
incrementXGoogRequestIdAttempt();
try (IScope s = tracer.withSpan(span)) {
long delay = spannerException.getRetryDelayInMillis();
if (delay != -1) {
Expand All @@ -302,12 +321,14 @@ protected PartialResultSet computeNext() {
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
stream = null;
xGoogRequestId = null;
continue;
}
}
}
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
spannerException.setRequestId(this.xGoogRequestId);
throw spannerException;
} catch (RuntimeException e) {
span.addAnnotation("Stream broken. Not safe to retry", e);
Expand All @@ -328,6 +349,11 @@ private void startGrpcStreaming() {
// this Span.
stream = checkNotNull(startStream(resumeToken, streamMessageListener));
stream.requestPrefetchChunks();
if (this.xGoogRequestId == null) {
this.xGoogRequestId =
this.xGoogRequestIdCreator.nextRequestId(
1 /* channelId shall be replaced by the instantiated class. */, 0);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,11 +300,12 @@ public CommitResponse writeAtLeastOnceWithOptions(
}
CommitRequest request = requestBuilder.build();
ISpan span = tracer.spanBuilder(SpannerImpl.COMMIT);
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);

try (IScope s = tracer.withSpan(span)) {
return SpannerRetryHelper.runTxWithRetriesOnAborted(
() -> {
// On Aborted, we have to start a fresh request id.
final XGoogSpannerRequestId reqId = reqIdOrFresh(options);
return new CommitResponse(
spanner.getRpc().commit(request, reqId.withOptions(getOptions())));
});
Expand Down Expand Up @@ -464,15 +465,15 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...

@Override
public ApiFuture<Empty> asyncClose() {
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 0);
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(getOptions()));
}

@Override
public void close() {
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
try (IScope s = tracer.withSpan(span)) {
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 0);
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
spanner.getRpc().deleteSession(getName(), reqId.withOptions(getOptions()));
} catch (RuntimeException e) {
span.setStatus(e);
Expand Down Expand Up @@ -516,7 +517,7 @@ ApiFuture<Transaction> beginTransactionAsync(
Transaction txn = requestFuture.get();
if (txn.getId().isEmpty()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName());
ErrorCode.INTERNAL, "Missing id in transaction\n" + getName(), reqId);
}
span.end();
res.set(txn);
Expand All @@ -525,7 +526,7 @@ ApiFuture<Transaction> beginTransactionAsync(
span.end();
res.setException(
SpannerExceptionFactory.newSpannerException(
e.getCause() == null ? e : e.getCause()));
e.getCause() == null ? e : e.getCause(), reqId));
} catch (InterruptedException e) {
span.setStatus(e);
span.end();
Expand Down Expand Up @@ -599,7 +600,14 @@ int getChannel() {
if (getIsMultiplexed()) {
return 0;
}
Long channelHint = (Long) this.getOptions().get(SpannerRpc.Option.CHANNEL_HINT);
Map<SpannerRpc.Option, ?> options = this.getOptions();
if (options == null) {
return 0;
}
Long channelHint = (Long) options.get(SpannerRpc.Option.CHANNEL_HINT);
if (channelHint == null) {
return 0;
}
return (int) (channelHint % this.spanner.getOptions().getNumChannels());
}
}
Loading