Skip to content

Commit 0ef0337

Browse files
olavloitelqiu96
authored andcommitted
chore: add experimental feature for retrying on different gRPC channel (#3273)
* chore: add experimental feature for retrying on different gRPC channel * chore: cleanup * chore: unflake test cases
1 parent c9e9d60 commit 0ef0337

16 files changed

+739
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import com.google.cloud.spanner.SessionClient.SessionOption;
3737
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
3838
import com.google.cloud.spanner.spi.v1.SpannerRpc;
39+
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
3940
import com.google.common.annotations.VisibleForTesting;
4041
import com.google.common.base.Preconditions;
4142
import com.google.common.collect.ImmutableMap;
@@ -45,7 +46,6 @@
4546
import com.google.spanner.v1.DirectedReadOptions;
4647
import com.google.spanner.v1.ExecuteBatchDmlRequest;
4748
import com.google.spanner.v1.ExecuteSqlRequest;
48-
import com.google.spanner.v1.ExecuteSqlRequest.Builder;
4949
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
5050
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
5151
import com.google.spanner.v1.PartialResultSet;
@@ -184,7 +184,7 @@ static Builder newBuilder() {
184184
@GuardedBy("lock")
185185
private boolean used;
186186

187-
private final Map<SpannerRpc.Option, ?> channelHint;
187+
private Map<SpannerRpc.Option, ?> channelHint;
188188

189189
private SingleReadContext(Builder builder) {
190190
super(builder);
@@ -227,6 +227,16 @@ TransactionSelector getTransactionSelector() {
227227
Map<SpannerRpc.Option, ?> getTransactionChannelHint() {
228228
return channelHint;
229229
}
230+
231+
@Override
232+
boolean prepareRetryOnDifferentGrpcChannel() {
233+
if (session.getIsMultiplexed() && channelHint.get(Option.CHANNEL_HINT) != null) {
234+
long channelHintForTransaction = Option.CHANNEL_HINT.getLong(channelHint) + 1L;
235+
channelHint = optionMap(SessionOption.channelHint(channelHintForTransaction));
236+
return true;
237+
}
238+
return super.prepareRetryOnDifferentGrpcChannel();
239+
}
230240
}
231241

232242
private static void assertTimestampAvailable(boolean available) {
@@ -745,6 +755,7 @@ ResultSet executeQueryInternalWithOptions(
745755
span,
746756
tracer,
747757
tracer.createStatementAttributes(statement, options),
758+
session.getErrorHandler(),
748759
rpc.getExecuteQueryRetrySettings(),
749760
rpc.getExecuteQueryRetryableCodes()) {
750761
@Override
@@ -774,6 +785,11 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
774785
stream.setCall(call, request.getTransaction().hasBegin());
775786
return stream;
776787
}
788+
789+
@Override
790+
boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
791+
return AbstractReadContext.this.prepareRetryOnDifferentGrpcChannel();
792+
}
777793
};
778794
return new GrpcResultSet(
779795
stream, this, options.hasDecodeMode() ? options.decodeMode() : defaultDecodeMode);
@@ -840,6 +856,10 @@ public void close() {
840856
*/
841857
abstract Map<SpannerRpc.Option, ?> getTransactionChannelHint();
842858

859+
boolean prepareRetryOnDifferentGrpcChannel() {
860+
return false;
861+
}
862+
843863
/**
844864
* Returns the transaction tag for this {@link AbstractReadContext} or <code>null</code> if this
845865
* {@link AbstractReadContext} does not have a transaction tag.
@@ -918,6 +938,7 @@ ResultSet readInternalWithOptions(
918938
SpannerImpl.READ,
919939
span,
920940
tracer,
941+
session.getErrorHandler(),
921942
rpc.getReadRetrySettings(),
922943
rpc.getReadRetryableCodes()) {
923944
@Override
@@ -945,6 +966,11 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
945966
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
946967
return stream;
947968
}
969+
970+
@Override
971+
boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
972+
return AbstractReadContext.this.prepareRetryOnDifferentGrpcChannel();
973+
}
948974
};
949975
return new GrpcResultSet(
950976
stream, this, readOptions.hasDecodeMode() ? readOptions.decodeMode() : defaultDecodeMode);
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import com.google.api.core.BetaApi;
20+
import javax.annotation.Nonnull;
21+
22+
/**
23+
* The {@link ErrorHandler} interface can be used to implement custom error and retry handling for
24+
* specific cases. The default implementation does nothing and falls back to the standard error and
25+
* retry handling in Gax and the Spanner client.
26+
*/
27+
@BetaApi
28+
interface ErrorHandler {
29+
@Nonnull
30+
Throwable translateException(@Nonnull Throwable exception);
31+
32+
int getMaxAttempts();
33+
34+
class DefaultErrorHandler implements ErrorHandler {
35+
static final DefaultErrorHandler INSTANCE = new DefaultErrorHandler();
36+
37+
private DefaultErrorHandler() {}
38+
39+
@Nonnull
40+
@Override
41+
public Throwable translateException(@Nonnull Throwable exception) {
42+
return exception;
43+
}
44+
45+
@Override
46+
public int getMaxAttempts() {
47+
return 0;
48+
}
49+
}
50+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
5757
implements CloseableIterator<PartialResultSet> {
5858
private static final RetrySettings DEFAULT_STREAMING_RETRY_SETTINGS =
5959
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetrySettings();
60+
private final ErrorHandler errorHandler;
6061
private final RetrySettings streamingRetrySettings;
6162
private final Set<Code> retryableCodes;
6263
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
@@ -80,6 +81,7 @@ protected ResumableStreamIterator(
8081
String streamName,
8182
ISpan parent,
8283
TraceWrapper tracer,
84+
ErrorHandler errorHandler,
8385
RetrySettings streamingRetrySettings,
8486
Set<Code> retryableCodes) {
8587
this(
@@ -88,6 +90,7 @@ protected ResumableStreamIterator(
8890
parent,
8991
tracer,
9092
Attributes.empty(),
93+
errorHandler,
9194
streamingRetrySettings,
9295
retryableCodes);
9396
}
@@ -98,12 +101,14 @@ protected ResumableStreamIterator(
98101
ISpan parent,
99102
TraceWrapper tracer,
100103
Attributes attributes,
104+
ErrorHandler errorHandler,
101105
RetrySettings streamingRetrySettings,
102106
Set<Code> retryableCodes) {
103107
checkArgument(maxBufferSize >= 0);
104108
this.maxBufferSize = maxBufferSize;
105109
this.tracer = tracer;
106110
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent, attributes);
111+
this.errorHandler = errorHandler;
107112
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
108113
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
109114
}
@@ -193,6 +198,14 @@ public void execute(Runnable command) {
193198

194199
abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken);
195200

201+
/**
202+
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
203+
* possible, and false otherwise. A retry should only be attempted if the method returns true.
204+
*/
205+
boolean prepareIteratorForRetryOnDifferentGrpcChannel() {
206+
return false;
207+
}
208+
196209
@Override
197210
public void close(@Nullable String message) {
198211
if (stream != null) {
@@ -209,6 +222,7 @@ public boolean isWithBeginTransaction() {
209222

210223
@Override
211224
protected PartialResultSet computeNext() {
225+
int numAttemptsOnOtherChannel = 0;
212226
Context context = Context.current();
213227
while (true) {
214228
// Eagerly start stream before consuming any buffered items.
@@ -279,6 +293,17 @@ protected PartialResultSet computeNext() {
279293

280294
continue;
281295
}
296+
// Check if we should retry the request on a different gRPC channel.
297+
if (resumeToken == null && buffer.isEmpty()) {
298+
Throwable translated = errorHandler.translateException(spannerException);
299+
if (translated instanceof RetryOnDifferentGrpcChannelException) {
300+
if (++numAttemptsOnOtherChannel < errorHandler.getMaxAttempts()
301+
&& prepareIteratorForRetryOnDifferentGrpcChannel()) {
302+
stream = null;
303+
continue;
304+
}
305+
}
306+
}
282307
span.addAnnotation("Stream broken. Not safe to retry", spannerException);
283308
span.setStatus(spannerException);
284309
throw spannerException;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;
20+
21+
import com.google.api.core.BetaApi;
22+
import com.google.cloud.spanner.spi.v1.SpannerRpc.Option;
23+
import javax.annotation.Nonnull;
24+
25+
/**
26+
* An experimental error handler that allows DEADLINE_EXCEEDED errors to be retried on a different
27+
* gRPC channel. This handler is only used if the system property
28+
* 'spanner.retry_deadline_exceeded_on_different_channel' has been set to true, and it is only used
29+
* in the following specific cases:
30+
*
31+
* <ol>
32+
* <li>A DEADLINE_EXCEEDED error during a read/write transaction. The error is translated to a
33+
* {@link RetryOnDifferentGrpcChannelException}, which is caught by the session pool and
34+
* causes a retry of the entire transaction on a different session and different gRPC channel.
35+
* <li>A DEADLINE_EXCEEDED error during a single-use read-only transaction using a multiplexed
36+
* session. Note that errors for the same using a regular session are not retried.
37+
* </ol>
38+
*/
39+
@BetaApi
40+
class RetryOnDifferentGrpcChannelErrorHandler implements ErrorHandler {
41+
private final int maxAttempts;
42+
43+
private final SessionImpl session;
44+
45+
static boolean isEnabled() {
46+
return Boolean.parseBoolean(
47+
System.getProperty("spanner.retry_deadline_exceeded_on_different_channel", "false"));
48+
}
49+
50+
RetryOnDifferentGrpcChannelErrorHandler(int maxAttempts, SessionImpl session) {
51+
this.maxAttempts = maxAttempts;
52+
this.session = session;
53+
}
54+
55+
@Override
56+
@Nonnull
57+
public Throwable translateException(@Nonnull Throwable exception) {
58+
if (session == null || !(exception instanceof SpannerException)) {
59+
return exception;
60+
}
61+
SpannerException spannerException = (SpannerException) exception;
62+
if (spannerException.getErrorCode() == ErrorCode.DEADLINE_EXCEEDED) {
63+
if (session.getIsMultiplexed()
64+
|| (session.getOptions() != null
65+
&& session.getOptions().containsKey(Option.CHANNEL_HINT))) {
66+
int channel = NO_CHANNEL_HINT;
67+
if (session.getOptions() != null && session.getOptions().containsKey(Option.CHANNEL_HINT)) {
68+
channel = Option.CHANNEL_HINT.getLong(session.getOptions()).intValue();
69+
}
70+
return SpannerExceptionFactory.newRetryOnDifferentGrpcChannelException(
71+
"Retrying on a new gRPC channel due to a DEADLINE_EXCEEDED error",
72+
channel,
73+
spannerException);
74+
}
75+
}
76+
return spannerException;
77+
}
78+
79+
@Override
80+
public int getMaxAttempts() {
81+
return maxAttempts;
82+
}
83+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import javax.annotation.Nullable;
20+
21+
class RetryOnDifferentGrpcChannelException extends SpannerException {
22+
private final int channel;
23+
24+
RetryOnDifferentGrpcChannelException(
25+
@Nullable String message, int channel, @Nullable Throwable cause) {
26+
// Note: We set retryable=false, as the exception is not retryable in the standard way.
27+
super(DoNotConstructDirectly.ALLOWED, ErrorCode.INTERNAL, /*retryable=*/ false, message, cause);
28+
this.channel = channel;
29+
}
30+
31+
int getChannel() {
32+
return this.channel;
33+
}
34+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.cloud.spanner.AbstractReadContext.MultiUseReadOnlyTransaction;
2727
import com.google.cloud.spanner.AbstractReadContext.SingleReadContext;
2828
import com.google.cloud.spanner.AbstractReadContext.SingleUseReadOnlyTransaction;
29+
import com.google.cloud.spanner.ErrorHandler.DefaultErrorHandler;
2930
import com.google.cloud.spanner.Options.TransactionOption;
3031
import com.google.cloud.spanner.Options.UpdateOption;
3132
import com.google.cloud.spanner.SessionClient.SessionOption;
@@ -116,6 +117,7 @@ interface SessionTransaction {
116117
private ISpan currentSpan;
117118
private final Clock clock;
118119
private final Map<SpannerRpc.Option, ?> options;
120+
private final ErrorHandler errorHandler;
119121

120122
SessionImpl(SpannerImpl spanner, SessionReference sessionReference) {
121123
this(spanner, sessionReference, NO_CHANNEL_HINT);
@@ -127,6 +129,7 @@ interface SessionTransaction {
127129
this.sessionReference = sessionReference;
128130
this.clock = spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
129131
this.options = createOptions(sessionReference, channelHint);
132+
this.errorHandler = createErrorHandler(spanner.getOptions());
130133
}
131134

132135
static Map<SpannerRpc.Option, ?> createOptions(
@@ -137,6 +140,13 @@ interface SessionTransaction {
137140
return CHANNEL_HINT_OPTIONS[channelHint % CHANNEL_HINT_OPTIONS.length];
138141
}
139142

143+
private ErrorHandler createErrorHandler(SpannerOptions options) {
144+
if (RetryOnDifferentGrpcChannelErrorHandler.isEnabled()) {
145+
return new RetryOnDifferentGrpcChannelErrorHandler(options.getNumChannels(), this);
146+
}
147+
return DefaultErrorHandler.INSTANCE;
148+
}
149+
140150
@Override
141151
public String getName() {
142152
return sessionReference.getName();
@@ -146,6 +156,10 @@ public String getName() {
146156
return options;
147157
}
148158

159+
ErrorHandler getErrorHandler() {
160+
return this.errorHandler;
161+
}
162+
149163
void setCurrentSpan(ISpan span) {
150164
currentSpan = span;
151165
}

0 commit comments

Comments
 (0)