22
22
import com .google .api .gax .paging .Page ;
23
23
import com .google .api .gax .retrying .RetrySettings ;
24
24
import com .google .api .gax .rpc .DeadlineExceededException ;
25
+ import com .google .api .gax .rpc .TransportChannelProvider ;
25
26
import com .google .api .gax .rpc .UnavailableException ;
27
+ import com .google .auth .Credentials ;
28
+ import com .google .auth .oauth2 .GoogleCredentials ;
26
29
import com .google .cloud .ByteArray ;
27
30
import com .google .cloud .Date ;
31
+ import com .google .cloud .NoCredentials ;
28
32
import com .google .cloud .Timestamp ;
29
33
import com .google .cloud .spanner .Backup ;
30
34
import com .google .cloud .spanner .BatchClient ;
144
148
import com .google .spanner .v1 .TypeCode ;
145
149
import io .grpc .Status ;
146
150
import io .grpc .stub .StreamObserver ;
151
+ import java .io .ByteArrayInputStream ;
152
+ import java .io .File ;
147
153
import java .io .IOException ;
148
154
import java .io .ObjectInputStream ;
149
155
import java .io .ObjectOutputStream ;
162
168
import java .util .logging .Logger ;
163
169
import java .util .stream .Collectors ;
164
170
import javax .annotation .Nullable ;
171
+ import org .apache .commons .io .FileUtils ;
165
172
import org .jetbrains .annotations .NotNull ;
166
173
import org .threeten .bp .Duration ;
167
174
import org .threeten .bp .LocalDate ;
@@ -743,6 +750,20 @@ private synchronized Spanner getClient() throws IOException {
743
750
744
751
// Return the spanner client, create one if not exists.
745
752
private synchronized Spanner getClient (long timeoutSeconds ) throws IOException {
753
+ // Create a cloud spanner client
754
+ Credentials credentials ;
755
+ if (WorkerProxy .serviceKeyFile .isEmpty ()) {
756
+ credentials = NoCredentials .getInstance ();
757
+ } else {
758
+ credentials =
759
+ GoogleCredentials .fromStream (
760
+ new ByteArrayInputStream (
761
+ FileUtils .readFileToByteArray (new File (WorkerProxy .serviceKeyFile ))),
762
+ HTTP_TRANSPORT_FACTORY );
763
+ }
764
+
765
+ TransportChannelProvider channelProvider =
766
+ CloudUtil .newChannelProviderHelper (WorkerProxy .spannerPort );
746
767
747
768
Duration rpcTimeout = Duration .ofHours (1L );
748
769
if (timeoutSeconds > 0 ) {
@@ -761,7 +782,12 @@ private synchronized Spanner getClient(long timeoutSeconds) throws IOException {
761
782
762
783
// Cloud Spanner Client does not support global retry settings,
763
784
// Thus, we need to add retry settings to each individual stub.
764
- SpannerOptions .Builder optionsBuilder = SpannerOptions .newBuilder ().setProjectId (PROJECT_ID );
785
+ SpannerOptions .Builder optionsBuilder =
786
+ SpannerOptions .newBuilder ()
787
+ .setProjectId (PROJECT_ID )
788
+ .setHost (HOST_PREFIX + WorkerProxy .spannerPort )
789
+ .setCredentials (credentials )
790
+ .setChannelProvider (channelProvider );
765
791
766
792
SpannerStubSettings .Builder stubSettingsBuilder =
767
793
optionsBuilder .getSpannerStubSettingsBuilder ();
@@ -1967,7 +1993,7 @@ private Status executePartitionedUpdate(
1967
1993
dbClient .executePartitionedUpdate (
1968
1994
Statement .of (action .getUpdate ().getSql ()),
1969
1995
Options .tag (options .getTag ()),
1970
- Options .priority (RpcPriority .getEnumFromProto (options .getRpcPriority ())));
1996
+ Options .priority (RpcPriority .fromProto (options .getRpcPriority ())));
1971
1997
SpannerActionOutcome outcome =
1972
1998
SpannerActionOutcome .newBuilder ()
1973
1999
.setStatus (toProto (Status .OK ))
0 commit comments