Skip to content

fix: make sure commitAsync always finishes #3216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,12 +306,23 @@ private void createTxnAsync(final SettableApiFuture<Void> res) {

void commit() {
try {
commitResponse = commitAsync().get();
} catch (InterruptedException e) {
// Normally, Gax will take care of any timeouts, but we add a timeout for getting the value
// from the future here as well to make sure the call always finishes, even if the future
// never resolves.
commitResponse =
commitAsync()
.get(
rpc.getCommitRetrySettings().getTotalTimeout().getSeconds() + 5,
TimeUnit.SECONDS);
} catch (InterruptedException | TimeoutException e) {
if (commitFuture != null) {
commitFuture.cancel(true);
}
throw SpannerExceptionFactory.propagateInterrupt(e);
if (e instanceof InterruptedException) {
throw SpannerExceptionFactory.propagateInterrupt((InterruptedException) e);
} else {
throw SpannerExceptionFactory.propagateTimeout((TimeoutException) e);
}
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause());
}
Expand Down Expand Up @@ -422,6 +433,14 @@ public void run() {
commitFuture.addListener(
() -> {
try (IScope ignore = tracer.withSpan(opSpan)) {
if (!commitFuture.isDone()) {
// This should not be possible, considering that we are in a listener for the
// future, but we add a result here as well as a safety precaution.
res.setException(
SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "commitFuture is not done"));
return;
}
com.google.spanner.v1.CommitResponse proto = commitFuture.get();
if (!proto.hasCommitTimestamp()) {
throw newSpannerException(
Expand All @@ -430,30 +449,35 @@ public void run() {
span.addAnnotation("Commit Done");
opSpan.end();
res.set(new CommitResponse(proto));
} catch (Throwable e) {
if (e instanceof ExecutionException) {
e =
SpannerExceptionFactory.newSpannerException(
e.getCause() == null ? e : e.getCause());
} else if (e instanceof InterruptedException) {
e = SpannerExceptionFactory.propagateInterrupt((InterruptedException) e);
} else {
e = SpannerExceptionFactory.newSpannerException(e);
} catch (Throwable throwable) {
SpannerException resultException;
try {
if (throwable instanceof ExecutionException) {
resultException =
SpannerExceptionFactory.asSpannerException(
throwable.getCause() == null ? throwable : throwable.getCause());
} else if (throwable instanceof InterruptedException) {
resultException =
SpannerExceptionFactory.propagateInterrupt(
(InterruptedException) throwable);
} else {
resultException = SpannerExceptionFactory.asSpannerException(throwable);
}
span.addAnnotation("Commit Failed", resultException);
opSpan.setStatus(resultException);
opSpan.end();
res.setException(onError(resultException, false));
} catch (Throwable unexpectedError) {
// This is a safety precaution to make sure that a result is always returned.
res.setException(unexpectedError);
}
span.addAnnotation("Commit Failed", e);
opSpan.setStatus(e);
opSpan.end();
res.setException(onError((SpannerException) e, false));
}
},
MoreExecutors.directExecutor());
} catch (InterruptedException e) {
res.setException(SpannerExceptionFactory.propagateInterrupt(e));
} catch (TimeoutException e) {
res.setException(SpannerExceptionFactory.propagateTimeout(e));
} catch (ExecutionException e) {
res.setException(
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
} catch (Throwable e) {
res.setException(
SpannerExceptionFactory.newSpannerException(e.getCause() == null ? e : e.getCause()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private final Set<Code> executeQueryRetryableCodes;
private final RetrySettings readRetrySettings;
private final Set<Code> readRetryableCodes;
private final RetrySettings commitRetrySettings;
private final SpannerStub partitionedDmlStub;
private final RetrySettings partitionedDmlRetrySettings;
private final InstanceAdminStubSettings instanceAdminStubSettings;
Expand Down Expand Up @@ -398,6 +399,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetrySettings();
this.executeQueryRetryableCodes =
options.getSpannerStubSettings().executeStreamingSqlSettings().getRetryableCodes();
this.commitRetrySettings =
options.getSpannerStubSettings().commitSettings().getRetrySettings();
partitionedDmlRetrySettings =
options
.getSpannerStubSettings()
Expand Down Expand Up @@ -508,6 +511,8 @@ public <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUnaryCalla
this.readRetryableCodes = null;
this.executeQueryRetrySettings = null;
this.executeQueryRetryableCodes = null;
this.commitRetrySettings =
SpannerStubSettings.newBuilder().commitSettings().getRetrySettings();
this.partitionedDmlStub = null;
this.databaseAdminStubSettings = null;
this.instanceAdminStubSettings = null;
Expand Down Expand Up @@ -1801,6 +1806,11 @@ public CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option,
return get(commitAsync(commitRequest, options));
}

@Override
public RetrySettings getCommitRetrySettings() {
return commitRetrySettings;
}

@Override
public ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Option, ?> options) {
GrpcCallContext context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ CommitResponse commit(CommitRequest commitRequest, @Nullable Map<Option, ?> opti
ApiFuture<CommitResponse> commitAsync(
CommitRequest commitRequest, @Nullable Map<Option, ?> options);

default RetrySettings getCommitRetrySettings() {
return SpannerStubSettings.newBuilder().commitSettings().getRetrySettings();
}

void rollback(RollbackRequest request, @Nullable Map<Option, ?> options) throws SpannerException;

ApiFuture<Empty> rollbackAsync(RollbackRequest request, @Nullable Map<Option, ?> options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ public void setUp() {
when(rpc.getExecuteQueryRetryableCodes())
.thenReturn(
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes());
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
session = spanner.getSessionClient(db).createSession();
Span oTspan = mock(Span.class);
ISpan span = new OpenTelemetrySpan(oTspan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import com.google.rpc.Code;
Expand Down Expand Up @@ -80,6 +81,8 @@ public void setup() {
when(tracer.spanBuilderWithExplicitParent(
eq(SpannerImpl.BATCH_UPDATE), eq(span), any(Attributes.class)))
.thenReturn(span);
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
}

private TransactionContextImpl createContext() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory;
import com.google.cloud.spanner.TransactionManager.TransactionState;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.protobuf.ByteString;
import com.google.protobuf.Empty;
import com.google.spanner.v1.BeginTransactionRequest;
Expand Down Expand Up @@ -248,6 +249,8 @@ public void usesPreparedTransaction() {
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() * 1000))
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
DatabaseId db = DatabaseId.of("test", "test", "test");
try (SpannerImpl spanner = new SpannerImpl(rpc, options)) {
DatabaseClient client = spanner.getDatabaseClient(db);
Expand Down Expand Up @@ -332,6 +335,8 @@ public void inlineBegin() {
com.google.protobuf.Timestamp.newBuilder()
.setSeconds(System.currentTimeMillis() * 1000))
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
DatabaseId db = DatabaseId.of("test", "test", "test");
try (SpannerImpl spanner = new SpannerImpl(rpc, options)) {
DatabaseClient client = spanner.getDatabaseClient(db);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.google.cloud.spanner.SessionClient.SessionId;
import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.Duration;
Expand Down Expand Up @@ -141,6 +142,8 @@ public void setUp() {
CommitResponse.newBuilder()
.setCommitTimestamp(Timestamp.getDefaultInstance())
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
when(rpc.rollbackAsync(Mockito.any(RollbackRequest.class), Mockito.anyMap()))
.thenReturn(ApiFutures.immediateFuture(Empty.getDefaultInstance()));
Span oTspan = mock(Span.class);
Expand Down Expand Up @@ -196,6 +199,8 @@ public void usesPreparedTransaction() {
.setCommitTimestamp(
Timestamp.newBuilder().setSeconds(System.currentTimeMillis() * 1000))
.build()));
when(rpc.getCommitRetrySettings())
.thenReturn(SpannerStubSettings.newBuilder().commitSettings().getRetrySettings());
DatabaseId db = DatabaseId.of("test", "test", "test");
try (SpannerImpl spanner = new SpannerImpl(rpc, options)) {
DatabaseClient client = spanner.getDatabaseClient(db);
Expand Down
Loading