Skip to content

chore: add experimental feature for retrying on different gRPC channel #3273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.google.cloud.spanner.SessionClient.SessionOption;
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
Expand All @@ -45,7 +46,6 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.Builder;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
Expand Down Expand Up @@ -184,7 +184,7 @@ static Builder newBuilder() {
@GuardedBy("lock")
private boolean used;

private final Map<SpannerRpc.Option, ?> channelHint;
private Map<SpannerRpc.Option, ?> channelHint;

private SingleReadContext(Builder builder) {
super(builder);
Expand Down Expand Up @@ -227,6 +227,16 @@ TransactionSelector getTransactionSelector() {
Map<SpannerRpc.Option, ?> getTransactionChannelHint() {
return channelHint;
}

@Override
boolean prepareRetryOnDifferentGrpcChannel() {
if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) {
long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L;
channelHint = optionMap(SessionOption.channelHint(channelHintForTransaction));
return true;
}
return super.prepareRetryOnDifferentGrpcChannel();
}
}

private static void assertTimestampAvailable(boolean available) {
Expand Down Expand Up @@ -745,6 +755,7 @@ ResultSet executeQueryInternalWithOptions(
span,
tracer,
tracer.createStatementAttributes(statement, options),
session.getErrorHandler(),
rpc.getExecuteQueryRetrySettings(),
rpc.getExecuteQueryRetryableCodes()) {
@Override
Expand Down Expand Up @@ -774,6 +785,11 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
stream.setCall(call, request.getTransaction().hasBegin());
return stream;
}

@Override
boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
return AbstractReadContext.this.prepareRetryOnDifferentGrpcChannel();
}
};
return new GrpcResultSet(
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
Expand Down Expand Up @@ -840,6 +856,10 @@ public void close() {
*/
abstract Map<SpannerRpc.Option, ?> getTransactionChannelHint();

boolean prepareRetryOnDifferentGrpcChannel() {
return false;
}

/**
* Returns the transaction tag for this {@link AbstractReadContext} or <code>null</code> if this
* {@link AbstractReadContext} does not have a transaction tag.
Expand Down Expand Up @@ -918,6 +938,7 @@ ResultSet readInternalWithOptions(
SpannerImpl.READ,
span,
tracer,
session.getErrorHandler(),
rpc.getReadRetrySettings(),
rpc.getReadRetryableCodes()) {
@Override
Expand Down Expand Up @@ -945,6 +966,11 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
}

@Override
boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
return AbstractReadContext.this.prepareRetryOnDifferentGrpcChannel();
}
};
return new GrpcResultSet(
stream, this, readOptions.hasDecodeMode() ? readOptions.decodeMode() : defaultDecodeMode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import com.google.api.core.BetaApi;
import javax.annotation.Nonnull;

/**
* The {@link ErrorHandler} interface can be used to implement custom error and retry handling for
* specific cases. The default implementation does nothing and falls back to the standard error and
* retry handling in Gax and the Spanner client.
*/
@BetaApi
interface ErrorHandler {
@Nonnull
Throwable translateException(@Nonnull Throwable exception);

int getMaxAttempts();

class DefaultErrorHandler implements ErrorHandler {
static final DefaultErrorHandler INSTANCE = new DefaultErrorHandler();

private DefaultErrorHandler() {}

@Nonnull
@Override
public Throwable translateException(@Nonnull Throwable exception) {
return exception;
}

@Override
public int getMaxAttempts() {
return 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
implements CloseableIterator<PartialResultSet> {
private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
private final ErrorHandler errorHandler;
private final RetrySettings streamingRetrySettings;
private final Set<Code> retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
Expand All @@ -80,6 +81,7 @@ protected ResumableStreamIterator(
String streamName,
ISpan parent,
TraceWrapper tracer,
ErrorHandler errorHandler,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
this(
Expand All @@ -88,6 +90,7 @@ protected ResumableStreamIterator(
parent,
tracer,
Attributes.empty(),
errorHandler,
streamingRetrySettings,
retryableCodes);
}
Expand All @@ -98,12 +101,14 @@ protected ResumableStreamIterator(
ISpan parent,
TraceWrapper tracer,
Attributes attributes,
ErrorHandler errorHandler,
RetrySettings streamingRetrySettings,
Set<Code> retryableCodes) {
checkArgument(maxBufferSize >= 0);
this.maxBufferSize = maxBufferSize;
this.tracer = tracer;
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
this.errorHandler = errorHandler;
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
}
Expand Down Expand Up @@ -193,6 +198,14 @@ public void execute(Runnable command) {

abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken);

/**
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
* possible, and false otherwise. A retry should only be attempted if the method returns true.
*/
boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
return false;
}

@Override
public void close(@Nullable String message) {
if (stream != null) {
Expand All @@ -209,6 +222,7 @@ public boolean isWithBeginTransaction() {

@Override
protected PartialResultSet computeNext() {
int numAttemptsOnOtherChannel = 0;
Context context = Context.current();
while (true) {
// Eagerly start stream before consuming any buffered items.
Expand Down Expand Up @@ -279,6 +293,17 @@ protected PartialResultSet computeNext() {

continue;
}
// Check if we should retry the request on a different gRPC channel.
if (resumeToken == null && buffer.isEmpty()) {
Throwable translated = errorHandler.translateException(spannerException);
if (translated instanceof RetryOnDifferentGrpcChannelException) {
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
stream = null;
continue;
}
}
}
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
span.setStatus(spannerException);
throw spannerException;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;

import com.google.api.core.BetaApi;
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
import javax.annotation.Nonnull;

/**
* An experimental error handler that allows DEADLINE_EXCEEDED errors to be retried on a different
* gRPC channel. This handler is only used if the system property
* 'spanner.retry_deadline_exceeded_on_different_channel' has been set to true, and it is only used
* in the following specific cases:
*
* <ol>
* <li>A DEADLINE_EXCEEDED error during a read/write transaction. The error is translated to a
* {@link RetryOnDifferentGrpcChannelException}, which is caught by the session pool and
* causes a retry of the entire transaction on a different session and different gRPC channel.
* <li>A DEADLINE_EXCEEDED error during a single-use read-only transaction using a multiplexed
* session. Note that errors for the same using a regular session are not retried.
* </ol>
*/
@BetaApi
class RetryOnDifferentGrpcChannelErrorHandler implements ErrorHandler {
private final int maxAttempts;

private final SessionImpl session;

static boolean isEnabled() {
return Boolean.parseBoolean(
System.getProperty("spanner.retry_deadline_exceeded_on_different_channel", "false"));
}

RetryOnDifferentGrpcChannelErrorHandler(int maxAttempts, SessionImpl session) {
this.maxAttempts = maxAttempts;
this.session = session;
}

@Override
@Nonnull
public Throwable translateException(@Nonnull Throwable exception) {
if (session == null || !(exception instanceof SpannerException)) {
return exception;
}
SpannerException spannerException = (SpannerException) exception;
if (spannerException.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED) {
if (session.getIsMultiplexed()
|| (session.getOptions() != null
&& session.getOptions().containsKey(Option.CHANNEL_HINT))) {
int channel = NO_CHANNEL_HINT;
if (session.getOptions() != null && session.getOptions().containsKey(Option.CHANNEL_HINT)) {
channel = Option.CHANNEL_HINT.getLong(session.getOptions()).intValue();
}
return SpannerExceptionFactory.newRetryOnDifferentGrpcChannelException(
"Retrying on a new gRPC channel due to a DEADLINE_EXCEEDED error",
channel,
spannerException);
}
}
return spannerException;
}

@Override
public int getMaxAttempts() {
return maxAttempts;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.spanner;

import javax.annotation.Nullable;

class RetryOnDifferentGrpcChannelException extends SpannerException {
private final int channel;

RetryOnDifferentGrpcChannelException(
@Nullable String message, int channel, @Nullable Throwable cause) {
// Note: We set retryable=false, as the exception is not retryable in the standard way.
super(DoNotConstructDirectly.ALLOWED, ErrorCode.INTERNAL, /*retryable=*/ false, message, cause);
this.channel = channel;
}

int getChannel() {
return this.channel;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler;
import com.google.cloud.spanner.Options.TransactionOption;
import com.google.cloud.spanner.Options.UpdateOption;
import com.google.cloud.spanner.SessionClient.SessionOption;
Expand Down Expand Up @@ -116,6 +117,7 @@ interface SessionTransaction {
private ISpan currentSpan;
private final Clock clock;
private final Map<SpannerRpc.Option, ?> options;
private final ErrorHandler errorHandler;

SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
this(spanner, sessionReference, NO_CHANNEL_HINT);
Expand All @@ -127,6 +129,7 @@ interface SessionTransaction {
this.sessionReference = sessionReference;
this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
this.options = createOptions(sessionReference, channelHint);
this.errorHandler = createErrorHandler(spanner.getOptions());
}

static Map<SpannerRpc.Option, ?> createOptions(
Expand All @@ -137,6 +140,13 @@ interface SessionTransaction {
return CHANNEL_HINT_OPTIONS[channelHint % CHANNEL_HINT_OPTIONS.length];
}

private ErrorHandler createErrorHandler(SpannerOptions options) {
if (RetryOnDifferentGrpcChannelErrorHandler.isEnabled()) {
return new RetryOnDifferentGrpcChannelErrorHandler(options.getNumChannels(), this);
}
return DefaultErrorHandler.INSTANCE;
}

@Override
public String getName() {
return sessionReference.getName();
Expand All @@ -146,6 +156,10 @@ public String getName() {
return options;
}

ErrorHandler getErrorHandler() {
return this.errorHandler;
}

void setCurrentSpan(ISpan span) {
currentSpan = span;
}
Expand Down
Loading
Loading