Skip to content

Commit f78f8c0

Browse files
bradmirochingor13
authored andcommitted
samples: feat: dataproc quickstart and createcluster (#1908)
* Added dataproc quickstart samples * Fixed linting, string formatting, copyrights * added overloaded functions to all samples * Formatting changes * small bug fixes * Fixed CreateCluster sample and added Quickstart * Added quickstart sample * Added dataproc quickstart samples * Fixed linting, string formatting, copyrights * added overloaded functions to all samples * Formatting changes * small bug fixes * Fixed CreateCluster sample and added Quickstart * Added quickstart sample * Updates to createCluster and quickstart * Fixed quickstart and tests * Changes to tests * Added periods to comments * Fixed pom and added handling for ExecutionException * Fixed lint errors * Fixed linting errors
1 parent 1d2fb6b commit f78f8c0

File tree

4 files changed

+339
-56
lines changed

4 files changed

+339
-56
lines changed
Lines changed: 39 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 Google Inc.
2+
* Copyright 2019 Google LLC
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,57 +27,58 @@
2727

2828
public class CreateCluster {
2929

30+
public static void createCluster() throws IOException, InterruptedException {
31+
// TODO(developer): Replace these variables before running the sample.
32+
String projectId = "your-project-id";
33+
String region = "your-project-region";
34+
String clusterName = "your-cluster-name";
35+
createCluster(projectId, region, clusterName);
36+
}
37+
3038
public static void createCluster(String projectId, String region, String clusterName)
3139
throws IOException, InterruptedException {
3240
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
3341

34-
// Configure the settings for the cluster controller client
42+
// Configure the settings for the cluster controller client.
3543
ClusterControllerSettings clusterControllerSettings =
3644
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
3745

38-
// Create a cluster controller client with the configured settings. We only need to create
39-
// the client once, and can be reused for multiple requests. Using a try-with-resources
40-
// will close the client for us, but this can also be done manually with the .close() method.
41-
try (ClusterControllerClient clusterControllerClient = ClusterControllerClient
42-
.create(clusterControllerSettings)) {
43-
// Configure the settings for our cluster
44-
InstanceGroupConfig masterConfig = InstanceGroupConfig.newBuilder()
45-
.setMachineTypeUri("n1-standard-1")
46-
.setNumInstances(1)
47-
.build();
48-
InstanceGroupConfig workerConfig = InstanceGroupConfig.newBuilder()
49-
.setMachineTypeUri("n1-standard-1")
50-
.setNumInstances(2)
51-
.build();
52-
ClusterConfig clusterConfig = ClusterConfig.newBuilder()
53-
.setMasterConfig(masterConfig)
54-
.setWorkerConfig(workerConfig)
55-
.build();
56-
// Create the cluster object with the desired cluster config
57-
Cluster cluster = Cluster.newBuilder()
58-
.setClusterName(clusterName)
59-
.setConfig(clusterConfig)
60-
.build();
46+
// Create a cluster controller client with the configured settings. The client only needs to be
47+
// created once and can be reused for multiple requests. Using a try-with-resources
48+
// closes the client, but this can also be done manually with the .close() method.
49+
try (ClusterControllerClient clusterControllerClient =
50+
ClusterControllerClient.create(clusterControllerSettings)) {
51+
// Configure the settings for our cluster.
52+
InstanceGroupConfig masterConfig =
53+
InstanceGroupConfig.newBuilder()
54+
.setMachineTypeUri("n1-standard-1")
55+
.setNumInstances(1)
56+
.build();
57+
InstanceGroupConfig workerConfig =
58+
InstanceGroupConfig.newBuilder()
59+
.setMachineTypeUri("n1-standard-1")
60+
.setNumInstances(2)
61+
.build();
62+
ClusterConfig clusterConfig =
63+
ClusterConfig.newBuilder()
64+
.setMasterConfig(masterConfig)
65+
.setWorkerConfig(workerConfig)
66+
.build();
67+
// Create the cluster object with the desired cluster config.
68+
Cluster cluster =
69+
Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();
6170

62-
// Create the Cloud Dataproc cluster
71+
// Create the Cloud Dataproc cluster.
6372
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
6473
clusterControllerClient.createClusterAsync(projectId, region, cluster);
6574
Cluster response = createClusterAsyncRequest.get();
6675

67-
// Print out a success message
68-
System.out.println(
69-
String.format("Cluster created successfully: %s", response.getClusterName())
70-
);
76+
// Print out a success message.
77+
System.out.printf("Cluster created successfully: %s", response.getClusterName());
7178

72-
} catch (IOException e) {
73-
// Likely this would occur due to issues authenticating with GCP. Make sure the environment
74-
// variable GOOGLE_APPLICATION_CREDENTIALS is configured.
75-
System.out.println("Error creating the cluster controller client: \n" + e.toString());
7679
} catch (ExecutionException e) {
77-
// Common issues for this include needing to increase compute engine quotas or a cluster of
78-
// the same name already exists.
79-
System.out.println("Error during cluster creation request: \n" + e.toString());
80+
System.err.println(String.format("Error executing createCluster: %s ", e.getMessage()));
8081
}
8182
}
8283
}
83-
// [END dataproc_create_cluster]
84+
// [END dataproc_create_cluster]
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
/*
2+
* Copyright 2019 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// [START dataproc_quickstart]
18+
import com.google.api.gax.longrunning.OperationFuture;
19+
import com.google.cloud.dataproc.v1.Cluster;
20+
import com.google.cloud.dataproc.v1.ClusterConfig;
21+
import com.google.cloud.dataproc.v1.ClusterControllerClient;
22+
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
23+
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
24+
import com.google.cloud.dataproc.v1.InstanceGroupConfig;
25+
import com.google.cloud.dataproc.v1.Job;
26+
import com.google.cloud.dataproc.v1.JobControllerClient;
27+
import com.google.cloud.dataproc.v1.JobControllerSettings;
28+
import com.google.cloud.dataproc.v1.JobPlacement;
29+
import com.google.cloud.dataproc.v1.PySparkJob;
30+
import com.google.cloud.storage.Blob;
31+
import com.google.cloud.storage.Storage;
32+
import com.google.cloud.storage.StorageOptions;
33+
import com.google.protobuf.Empty;
34+
import java.io.IOException;
35+
import java.util.concurrent.CompletableFuture;
36+
import java.util.concurrent.ExecutionException;
37+
import java.util.concurrent.TimeUnit;
38+
import java.util.concurrent.TimeoutException;
39+
40+
public class Quickstart {
41+
42+
public static Job waitForJobCompletion(
43+
JobControllerClient jobControllerClient, String projectId, String region, String jobId) {
44+
while (true) {
45+
// Poll the service periodically until the Job is in a finished state.
46+
Job jobInfo = jobControllerClient.getJob(projectId, region, jobId);
47+
switch (jobInfo.getStatus().getState()) {
48+
case DONE:
49+
case CANCELLED:
50+
case ERROR:
51+
return jobInfo;
52+
default:
53+
try {
54+
// Wait a second in between polling attempts.
55+
TimeUnit.SECONDS.sleep(1);
56+
} catch (InterruptedException e) {
57+
throw new RuntimeException(e);
58+
}
59+
}
60+
}
61+
}
62+
63+
public static void quickstart() throws IOException, InterruptedException {
64+
// TODO(developer): Replace these variables before running the sample.
65+
String projectId = "your-project-id";
66+
String region = "your-project-region";
67+
String clusterName = "your-cluster-name";
68+
String jobFilePath = "your-job-file-path";
69+
quickstart(projectId, region, clusterName, jobFilePath);
70+
}
71+
72+
public static void quickstart(
73+
String projectId, String region, String clusterName, String jobFilePath)
74+
throws IOException, InterruptedException {
75+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
76+
77+
// Configure the settings for the cluster controller client.
78+
ClusterControllerSettings clusterControllerSettings =
79+
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
80+
81+
// Configure the settings for the job controller client.
82+
JobControllerSettings jobControllerSettings =
83+
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
84+
85+
// Create both a cluster controller client and job controller client with the configured
86+
// settings. The client only needs to be created once and can be reused for multiple requests.
87+
// Using a try-with-resources closes the client, but this can also be done manually with
88+
// the .close() method.
89+
try (ClusterControllerClient clusterControllerClient =
90+
ClusterControllerClient.create(clusterControllerSettings);
91+
JobControllerClient jobControllerClient =
92+
JobControllerClient.create(jobControllerSettings)) {
93+
// Configure the settings for our cluster.
94+
InstanceGroupConfig masterConfig =
95+
InstanceGroupConfig.newBuilder()
96+
.setMachineTypeUri("n1-standard-1")
97+
.setNumInstances(1)
98+
.build();
99+
InstanceGroupConfig workerConfig =
100+
InstanceGroupConfig.newBuilder()
101+
.setMachineTypeUri("n1-standard-1")
102+
.setNumInstances(2)
103+
.build();
104+
ClusterConfig clusterConfig =
105+
ClusterConfig.newBuilder()
106+
.setMasterConfig(masterConfig)
107+
.setWorkerConfig(workerConfig)
108+
.build();
109+
// Create the cluster object with the desired cluster config.
110+
Cluster cluster =
111+
Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();
112+
113+
// Create the Cloud Dataproc cluster.
114+
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
115+
clusterControllerClient.createClusterAsync(projectId, region, cluster);
116+
Cluster response = createClusterAsyncRequest.get();
117+
System.out.printf("Cluster created successfully: %s", response.getClusterName());
118+
119+
// Configure the settings for our job.
120+
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
121+
PySparkJob pySparkJob = PySparkJob.newBuilder().setMainPythonFileUri(jobFilePath).build();
122+
Job job = Job.newBuilder().setPlacement(jobPlacement).setPysparkJob(pySparkJob).build();
123+
124+
// Submit an asynchronous request to execute the job.
125+
Job request = jobControllerClient.submitJob(projectId, region, job);
126+
String jobId = request.getReference().getJobId();
127+
System.out.println(String.format("Submitted job \"%s\"", jobId));
128+
129+
// Wait for the job to finish.
130+
CompletableFuture<Job> finishedJobFuture =
131+
CompletableFuture.supplyAsync(
132+
() -> waitForJobCompletion(jobControllerClient, projectId, region, jobId));
133+
int timeout = 10;
134+
try {
135+
Job jobInfo = finishedJobFuture.get(timeout, TimeUnit.MINUTES);
136+
System.out.printf("Job %s finished successfully.", jobId);
137+
138+
// Cloud Dataproc job output gets saved to a GCS bucket allocated to it.
139+
Cluster clusterInfo = clusterControllerClient.getCluster(projectId, region, clusterName);
140+
Storage storage = StorageOptions.getDefaultInstance().getService();
141+
Blob blob =
142+
storage.get(
143+
clusterInfo.getConfig().getConfigBucket(),
144+
String.format(
145+
"google-cloud-dataproc-metainfo/%s/jobs/%s/driveroutput.000000000",
146+
clusterInfo.getClusterUuid(), jobId));
147+
System.out.println(
148+
String.format(
149+
"Job \"%s\" finished with state %s:\n%s",
150+
jobId, jobInfo.getStatus().getState(), new String(blob.getContent())));
151+
} catch (TimeoutException e) {
152+
System.err.println(
153+
String.format("Job timed out after %d minutes: %s", timeout, e.getMessage()));
154+
}
155+
156+
// Delete the cluster.
157+
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
158+
clusterControllerClient.deleteClusterAsync(projectId, region, clusterName);
159+
deleteClusterAsyncRequest.get();
160+
System.out.println(String.format("Cluster \"%s\" successfully deleted.", clusterName));
161+
162+
} catch (ExecutionException e) {
163+
System.err.println(String.format("Error executing quickstart: %s ", e.getMessage()));
164+
}
165+
}
166+
}
167+
// [END dataproc_quickstart]

dataproc/snippets/src/test/java/CreateClusterTest.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,17 @@
3838
@RunWith(JUnit4.class)
3939
public class CreateClusterTest {
4040

41-
private static final String BASE_CLUSTER_NAME = "test-cluster";
41+
private static final String CLUSTER_NAME =
42+
String.format("java-cc-test-%s", UUID.randomUUID().toString());
4243
private static final String REGION = "us-central1";
44+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
4345

44-
private static String projectId = System.getenv("GOOGLE_CLOUD_PROJECT");
45-
private String clusterName;
4646
private ByteArrayOutputStream bout;
4747

4848
private static void requireEnv(String varName) {
4949
assertNotNull(
50-
System.getenv(varName),
51-
String.format("Environment variable '%s' is required to perform these tests.", varName)
52-
);
50+
String.format("Environment variable '%s' is required to perform these tests.", varName),
51+
System.getenv(varName));
5352
}
5453

5554
@BeforeClass
@@ -60,35 +59,30 @@ public static void checkRequirements() {
6059

6160
@Before
6261
public void setUp() {
63-
clusterName = String.format("%s-%s", BASE_CLUSTER_NAME, UUID.randomUUID().toString());
64-
6562
bout = new ByteArrayOutputStream();
6663
System.setOut(new PrintStream(bout));
6764
}
6865

6966
@Test
7067
public void createClusterTest() throws IOException, InterruptedException {
71-
CreateCluster.createCluster(projectId, REGION, clusterName);
68+
CreateCluster.createCluster(PROJECT_ID, REGION, CLUSTER_NAME);
7269
String output = bout.toString();
7370

74-
assertThat(output, CoreMatchers.containsString(clusterName));
71+
assertThat(output, CoreMatchers.containsString(CLUSTER_NAME));
7572
}
7673

7774
@After
78-
public void tearDown() throws IOException, InterruptedException {
75+
public void tearDown() throws IOException, InterruptedException, ExecutionException {
7976
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION);
8077

8178
ClusterControllerSettings clusterControllerSettings =
8279
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
8380

84-
try (ClusterControllerClient clusterControllerClient = ClusterControllerClient
85-
.create(clusterControllerSettings)) {
81+
try (ClusterControllerClient clusterControllerClient =
82+
ClusterControllerClient.create(clusterControllerSettings)) {
8683
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
87-
clusterControllerClient.deleteClusterAsync(projectId, REGION, clusterName);
84+
clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
8885
deleteClusterAsyncRequest.get();
89-
90-
} catch (ExecutionException e) {
91-
System.out.println("Error during cluster deletion: \n" + e.toString());
9286
}
9387
}
94-
}
88+
}

0 commit comments

Comments
 (0)