Skip to content

Commit 661df5f

Browse files
committed
feat: add PartitionedUpdate support to executor
1 parent d75ebc1 commit 661df5f

File tree

1 file changed

+38
-27
lines changed

1 file changed

+38
-27
lines changed

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudClientExecutor.java

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,9 @@
2222
import com.google.api.gax.paging.Page;
2323
import com.google.api.gax.retrying.RetrySettings;
2424
import com.google.api.gax.rpc.DeadlineExceededException;
25-
import com.google.api.gax.rpc.TransportChannelProvider;
2625
import com.google.api.gax.rpc.UnavailableException;
27-
import com.google.auth.Credentials;
28-
import com.google.auth.oauth2.GoogleCredentials;
2926
import com.google.cloud.ByteArray;
3027
import com.google.cloud.Date;
31-
import com.google.cloud.NoCredentials;
3228
import com.google.cloud.Timestamp;
3329
import com.google.cloud.spanner.Backup;
3430
import com.google.cloud.spanner.BatchClient;
@@ -52,6 +48,7 @@
5248
import com.google.cloud.spanner.Mutation;
5349
import com.google.cloud.spanner.Mutation.WriteBuilder;
5450
import com.google.cloud.spanner.Options;
51+
import com.google.cloud.spanner.Options.RpcPriority;
5552
import com.google.cloud.spanner.Partition;
5653
import com.google.cloud.spanner.PartitionOptions;
5754
import com.google.cloud.spanner.ReadContext;
@@ -128,6 +125,8 @@
128125
import com.google.spanner.executor.v1.MutationAction.Mod;
129126
import com.google.spanner.executor.v1.MutationAction.UpdateArgs;
130127
import com.google.spanner.executor.v1.OperationResponse;
128+
import com.google.spanner.executor.v1.PartitionedUpdateAction;
129+
import com.google.spanner.executor.v1.PartitionedUpdateAction.ExecutePartitionedUpdateOptions;
131130
import com.google.spanner.executor.v1.QueryAction;
132131
import com.google.spanner.executor.v1.ReadAction;
133132
import com.google.spanner.executor.v1.RestoreCloudDatabaseAction;
@@ -145,8 +144,6 @@
145144
import com.google.spanner.v1.TypeCode;
146145
import io.grpc.Status;
147146
import io.grpc.stub.StreamObserver;
148-
import java.io.ByteArrayInputStream;
149-
import java.io.File;
150147
import java.io.IOException;
151148
import java.io.ObjectInputStream;
152149
import java.io.ObjectOutputStream;
@@ -165,7 +162,6 @@
165162
import java.util.logging.Logger;
166163
import java.util.stream.Collectors;
167164
import javax.annotation.Nullable;
168-
import org.apache.commons.io.FileUtils;
169165
import org.jetbrains.annotations.NotNull;
170166
import org.threeten.bp.Duration;
171167
import org.threeten.bp.LocalDate;
@@ -747,20 +743,6 @@ private synchronized Spanner getClient() throws IOException {
747743

748744
// Return the spanner client, create one if not exists.
749745
private synchronized Spanner getClient(long timeoutSeconds) throws IOException {
750-
// Create a cloud spanner client
751-
Credentials credentials;
752-
if (WorkerProxy.serviceKeyFile.isEmpty()) {
753-
credentials = NoCredentials.getInstance();
754-
} else {
755-
credentials =
756-
GoogleCredentials.fromStream(
757-
new ByteArrayInputStream(
758-
FileUtils.readFileToByteArray(new File(WorkerProxy.serviceKeyFile))),
759-
HTTP_TRANSPORT_FACTORY);
760-
}
761-
762-
TransportChannelProvider channelProvider =
763-
CloudUtil.newChannelProviderHelper(WorkerProxy.spannerPort);
764746

765747
Duration rpcTimeout = Duration.ofHours(1L);
766748
if (timeoutSeconds > 0) {
@@ -779,12 +761,7 @@ private synchronized Spanner getClient(long timeoutSeconds) throws IOException {
779761

780762
// Cloud Spanner Client does not support global retry settings,
781763
// Thus, we need to add retry settings to each individual stub.
782-
SpannerOptions.Builder optionsBuilder =
783-
SpannerOptions.newBuilder()
784-
.setProjectId(PROJECT_ID)
785-
.setHost(HOST_PREFIX + WorkerProxy.spannerPort)
786-
.setCredentials(credentials)
787-
.setChannelProvider(channelProvider);
764+
SpannerOptions.Builder optionsBuilder = SpannerOptions.newBuilder().setProjectId(PROJECT_ID);
788765

789766
SpannerStubSettings.Builder stubSettingsBuilder =
790767
optionsBuilder.getSpannerStubSettingsBuilder();
@@ -886,6 +863,13 @@ private Status executeAction(
886863
} else if (action.hasExecutePartition()) {
887864
return executeExecutePartition(
888865
action.getExecutePartition(), outcomeSender, executionContext);
866+
} else if (action.hasPartitionedUpdate()) {
867+
if (dbPath == null) {
868+
throw SpannerExceptionFactory.newSpannerException(
869+
ErrorCode.INVALID_ARGUMENT, "Database path must be set for this action");
870+
}
871+
DatabaseClient dbClient = getClient().getDatabaseClient(DatabaseId.of(dbPath));
872+
return executePartitionedUpdate(action.getPartitionedUpdate(), dbClient, outcomeSender);
889873
} else if (action.hasCloseBatchTxn()) {
890874
return executeCloseBatchTxn(action.getCloseBatchTxn(), outcomeSender, executionContext);
891875
} else if (action.hasExecuteChangeStreamQuery()) {
@@ -1974,6 +1958,33 @@ private Status executeExecutePartition(
19741958
}
19751959
}
19761960

1961+
/** Execute a partitioned update which runs different partitions in parallel. */
1962+
private Status executePartitionedUpdate(
1963+
PartitionedUpdateAction action, DatabaseClient dbClient, OutcomeSender sender) {
1964+
try {
1965+
ExecutePartitionedUpdateOptions options = action.getOptions();
1966+
Long count =
1967+
dbClient.executePartitionedUpdate(
1968+
Statement.of(action.getUpdate().getSql()),
1969+
Options.tag(options.getTag()),
1970+
Options.priority(RpcPriority.getEnumFromProto(options.getRpcPriority())));
1971+
SpannerActionOutcome outcome =
1972+
SpannerActionOutcome.newBuilder()
1973+
.setStatus(toProto(Status.OK))
1974+
.addDmlRowsModified(count)
1975+
.build();
1976+
sender.sendOutcome(outcome);
1977+
return sender.finishWithOK();
1978+
} catch (SpannerException e) {
1979+
return sender.finishWithError(toStatus(e));
1980+
} catch (Exception e) {
1981+
return sender.finishWithError(
1982+
toStatus(
1983+
SpannerExceptionFactory.newSpannerException(
1984+
ErrorCode.INVALID_ARGUMENT, "Unexpected error: " + e.getMessage())));
1985+
}
1986+
}
1987+
19771988
/** Build a child partition record proto out of childPartitionRecord returned by client. */
19781989
private ChildPartitionsRecord buildChildPartitionRecord(Struct childPartitionRecord)
19791990
throws Exception {

0 commit comments

Comments
 (0)