Skip to content

Commit 3f3862e

Browse files
authored
feat: adding hadoopfs and autoscaling samples (#3262)
Adding a sample + test for submitting a HadoopFS command using the Java client library. Adding a sample + test for creating a Dataproc cluster with autoscaling configured. Follow-up to #2949
1 parent 514039d commit 3f3862e

File tree

5 files changed

+482
-1
lines changed

5 files changed

+482
-1
lines changed
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* This sample creates a Dataproc cluster with an autoscaling policy enabled.
17+
* The policy we will be creating mirrors the following YAML representation:
18+
*
19+
workerConfig:
20+
minInstances: 2
21+
maxInstances: 100
22+
weight: 1
23+
secondaryWorkerConfig:
24+
minInstances: 0
25+
maxInstances: 100
26+
weight: 1
27+
basicAlgorithm:
28+
cooldownPeriod: 4m
29+
yarnConfig:
30+
scaleUpFactor: 0.05
31+
scaleDownFactor: 1.0
32+
scaleUpMinWorkerFraction: 0.0
33+
scaleDownMinWorkerFraction: 0.0
34+
gracefulDecommissionTimeout: 1h
35+
*/
36+
37+
// [START dataproc_create_autoscaling_cluster]
38+
39+
import com.google.api.gax.longrunning.OperationFuture;
40+
import com.google.cloud.dataproc.v1.AutoscalingConfig;
41+
import com.google.cloud.dataproc.v1.AutoscalingPolicy;
42+
import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceClient;
43+
import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceSettings;
44+
import com.google.cloud.dataproc.v1.BasicAutoscalingAlgorithm;
45+
import com.google.cloud.dataproc.v1.BasicYarnAutoscalingConfig;
46+
import com.google.cloud.dataproc.v1.Cluster;
47+
import com.google.cloud.dataproc.v1.ClusterConfig;
48+
import com.google.cloud.dataproc.v1.ClusterControllerClient;
49+
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
50+
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
51+
import com.google.cloud.dataproc.v1.InstanceGroupAutoscalingPolicyConfig;
52+
import com.google.cloud.dataproc.v1.InstanceGroupConfig;
53+
import com.google.cloud.dataproc.v1.RegionName;
54+
import com.google.protobuf.Duration;
55+
import java.io.IOException;
56+
import java.util.concurrent.ExecutionException;
57+
58+
public class CreateClusterWithAutoscaling {
59+
60+
public static void createClusterwithAutoscaling() throws IOException, InterruptedException {
61+
// TODO(developer): Replace these variables before running the sample.
62+
String projectId = "your-project-id";
63+
String region = "your-project-region";
64+
String clusterName = "your-cluster-name";
65+
String autoscalingPolicyName = "your-autoscaling-policy";
66+
createClusterwithAutoscaling(projectId, region, clusterName, autoscalingPolicyName);
67+
}
68+
69+
public static void createClusterwithAutoscaling(
70+
String projectId, String region, String clusterName, String autoscalingPolicyName)
71+
throws IOException, InterruptedException {
72+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
73+
74+
// Configure the settings for the cluster controller client.
75+
ClusterControllerSettings clusterControllerSettings =
76+
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
77+
78+
// Configure the settings for the autoscaling policy service client.
79+
AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings =
80+
AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
81+
82+
// Create a cluster controller client and an autoscaling controller client with the configured
83+
// settings. The clients only need to be created once and can be reused for multiple requests.
84+
// Using a
85+
// try-with-resources closes the client, but this can also be done manually with the .close()
86+
// method.
87+
try (ClusterControllerClient clusterControllerClient =
88+
ClusterControllerClient.create(clusterControllerSettings);
89+
AutoscalingPolicyServiceClient autoscalingPolicyServiceClient =
90+
AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) {
91+
92+
// Create the Autoscaling policy.
93+
InstanceGroupAutoscalingPolicyConfig workerInstanceGroupAutoscalingPolicyConfig =
94+
InstanceGroupAutoscalingPolicyConfig.newBuilder()
95+
.setMinInstances(2)
96+
.setMaxInstances(100)
97+
.setWeight(1)
98+
.build();
99+
InstanceGroupAutoscalingPolicyConfig secondaryWorkerInstanceGroupAutoscalingPolicyConfig =
100+
InstanceGroupAutoscalingPolicyConfig.newBuilder()
101+
.setMinInstances(0)
102+
.setMaxInstances(100)
103+
.setWeight(1)
104+
.build();
105+
BasicYarnAutoscalingConfig basicYarnApplicationConfig =
106+
BasicYarnAutoscalingConfig.newBuilder()
107+
.setScaleUpFactor(0.05)
108+
.setScaleDownFactor(1.0)
109+
.setScaleUpMinWorkerFraction(0.0)
110+
.setScaleUpMinWorkerFraction(0.0)
111+
.setGracefulDecommissionTimeout(Duration.newBuilder().setSeconds(3600).build())
112+
.build();
113+
BasicAutoscalingAlgorithm basicAutoscalingAlgorithm =
114+
BasicAutoscalingAlgorithm.newBuilder()
115+
.setCooldownPeriod(Duration.newBuilder().setSeconds(240).build())
116+
.setYarnConfig(basicYarnApplicationConfig)
117+
.build();
118+
AutoscalingPolicy autoscalingPolicy =
119+
AutoscalingPolicy.newBuilder()
120+
.setId(autoscalingPolicyName)
121+
.setWorkerConfig(workerInstanceGroupAutoscalingPolicyConfig)
122+
.setSecondaryWorkerConfig(secondaryWorkerInstanceGroupAutoscalingPolicyConfig)
123+
.setBasicAlgorithm(basicAutoscalingAlgorithm)
124+
.build();
125+
RegionName parent = RegionName.of(projectId, region);
126+
127+
// Policy is uploaded here.
128+
autoscalingPolicyServiceClient.createAutoscalingPolicy(parent, autoscalingPolicy);
129+
130+
// Now the policy can be referenced when creating a cluster.
131+
String autoscalingPolicyUri =
132+
String.format(
133+
"projects/%s/locations/%s/autoscalingPolicies/%s",
134+
projectId, region, autoscalingPolicyName);
135+
AutoscalingConfig autoscalingConfig =
136+
AutoscalingConfig.newBuilder().setPolicyUri(autoscalingPolicyUri).build();
137+
138+
// Configure the settings for our cluster.
139+
InstanceGroupConfig masterConfig =
140+
InstanceGroupConfig.newBuilder()
141+
.setMachineTypeUri("n1-standard-1")
142+
.setNumInstances(1)
143+
.build();
144+
InstanceGroupConfig workerConfig =
145+
InstanceGroupConfig.newBuilder()
146+
.setMachineTypeUri("n1-standard-1")
147+
.setNumInstances(2)
148+
.build();
149+
ClusterConfig clusterConfig =
150+
ClusterConfig.newBuilder()
151+
.setMasterConfig(masterConfig)
152+
.setWorkerConfig(workerConfig)
153+
.setAutoscalingConfig(autoscalingConfig)
154+
.build();
155+
156+
// Create the cluster object with the desired cluster config.
157+
Cluster cluster =
158+
Cluster.newBuilder().setClusterName(clusterName).setConfig(clusterConfig).build();
159+
160+
// Create the Dataproc cluster.
161+
OperationFuture<Cluster, ClusterOperationMetadata> createClusterAsyncRequest =
162+
clusterControllerClient.createClusterAsync(projectId, region, cluster);
163+
Cluster response = createClusterAsyncRequest.get();
164+
165+
// Print out a success message.
166+
System.out.printf("Cluster created successfully: %s", response.getClusterName());
167+
168+
} catch (ExecutionException e) {
169+
// If cluster creation does not complete successfully, print the error message.
170+
System.err.println(String.format("createClusterWithAutoscaling: %s ", e.getMessage()));
171+
}
172+
}
173+
}
174+
// [END dataproc_create_autoscaling_cluster]
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// [START dataproc_submit_hadoop_fs_job]
18+
19+
import com.google.api.gax.longrunning.OperationFuture;
20+
import com.google.cloud.dataproc.v1.HadoopJob;
21+
import com.google.cloud.dataproc.v1.Job;
22+
import com.google.cloud.dataproc.v1.JobControllerClient;
23+
import com.google.cloud.dataproc.v1.JobControllerSettings;
24+
import com.google.cloud.dataproc.v1.JobMetadata;
25+
import com.google.cloud.dataproc.v1.JobPlacement;
26+
import com.google.cloud.storage.Blob;
27+
import com.google.cloud.storage.Storage;
28+
import com.google.cloud.storage.StorageOptions;
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.regex.Matcher;
34+
import java.util.regex.Pattern;
35+
36+
public class SubmitHadoopFsJob {
37+
38+
public static ArrayList<String> stringToList(String s) {
39+
return new ArrayList<>(Arrays.asList(s.split(" ")));
40+
}
41+
42+
public static void submitHadoopFsJob() throws IOException, InterruptedException {
43+
// TODO(developer): Replace these variables before running the sample.
44+
String projectId = "your-project-id";
45+
String region = "your-project-region";
46+
String clusterName = "your-cluster-name";
47+
String hadoopFsQuery = "your-hadoop-fs-query";
48+
submitHadoopFsJob(projectId, region, clusterName, hadoopFsQuery);
49+
}
50+
51+
public static void submitHadoopFsJob(
52+
String projectId, String region, String clusterName, String hadoopFsQuery)
53+
throws IOException, InterruptedException {
54+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
55+
56+
// Configure the settings for the job controller client.
57+
JobControllerSettings jobControllerSettings =
58+
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
59+
60+
// Create a job controller client with the configured settings. Using a try-with-resources
61+
// closes the client,
62+
// but this can also be done manually with the .close() method.
63+
try (JobControllerClient jobControllerClient =
64+
JobControllerClient.create(jobControllerSettings)) {
65+
66+
// Configure cluster placement for the job.
67+
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
68+
69+
// Configure Hadoop job settings. The HadoopFS query is set here.
70+
HadoopJob hadoopJob =
71+
HadoopJob.newBuilder()
72+
.setMainClass("org.apache.hadoop.fs.FsShell")
73+
.addAllArgs(stringToList(hadoopFsQuery))
74+
.build();
75+
76+
Job job = Job.newBuilder().setPlacement(jobPlacement).setHadoopJob(hadoopJob).build();
77+
78+
// Submit an asynchronous request to execute the job.
79+
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
80+
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
81+
82+
Job response = submitJobAsOperationAsyncRequest.get();
83+
84+
// Print output from Google Cloud Storage.
85+
Matcher matches =
86+
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
87+
matches.matches();
88+
89+
Storage storage = StorageOptions.getDefaultInstance().getService();
90+
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
91+
92+
System.out.println(
93+
String.format("Job finished successfully: %s", new String(blob.getContent())));
94+
95+
} catch (ExecutionException e) {
96+
// If the job does not complete successfully, print the error message.
97+
System.err.println(String.format("submitHadoopFSJob: %s ", e.getMessage()));
98+
}
99+
}
100+
}
101+
// [END dataproc_submit_hadoop_fs_job]
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import static junit.framework.TestCase.assertNotNull;
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
20+
import com.google.api.gax.longrunning.OperationFuture;
21+
import com.google.cloud.dataproc.v1.AutoscalingPolicyName;
22+
import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceClient;
23+
import com.google.cloud.dataproc.v1.AutoscalingPolicyServiceSettings;
24+
import com.google.cloud.dataproc.v1.ClusterControllerClient;
25+
import com.google.cloud.dataproc.v1.ClusterControllerSettings;
26+
import com.google.cloud.dataproc.v1.ClusterOperationMetadata;
27+
import com.google.protobuf.Empty;
28+
import java.io.ByteArrayOutputStream;
29+
import java.io.IOException;
30+
import java.io.PrintStream;
31+
import java.util.UUID;
32+
import java.util.concurrent.ExecutionException;
33+
import org.hamcrest.CoreMatchers;
34+
import org.junit.After;
35+
import org.junit.Before;
36+
import org.junit.BeforeClass;
37+
import org.junit.Test;
38+
import org.junit.runner.RunWith;
39+
import org.junit.runners.JUnit4;
40+
41+
@RunWith(JUnit4.class)
42+
public class CreateClusterWithAutoscalingTest {
43+
44+
private static final String CLUSTER_NAME =
45+
String.format("java-as-test-%s", UUID.randomUUID().toString());
46+
private static final String REGION = "us-central1";
47+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
48+
private static final String AUTOSCALING_POLICY_NAME =
49+
String.format("java-as-test-%s", UUID.randomUUID().toString());
50+
51+
private ByteArrayOutputStream bout;
52+
53+
private static void requireEnv(String varName) {
54+
assertNotNull(
55+
String.format("Environment variable '%s' is required to perform these tests.", varName),
56+
System.getenv(varName));
57+
}
58+
59+
@BeforeClass
60+
public static void checkRequirements() {
61+
requireEnv("GOOGLE_APPLICATION_CREDENTIALS");
62+
requireEnv("GOOGLE_CLOUD_PROJECT");
63+
}
64+
65+
@Before
66+
public void setUp() {
67+
bout = new ByteArrayOutputStream();
68+
System.setOut(new PrintStream(bout));
69+
}
70+
71+
@After
72+
public void tearDown() throws IOException, InterruptedException, ExecutionException {
73+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION);
74+
75+
ClusterControllerSettings clusterControllerSettings =
76+
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
77+
78+
AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings =
79+
AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
80+
81+
try (ClusterControllerClient clusterControllerClient =
82+
ClusterControllerClient.create(clusterControllerSettings);
83+
AutoscalingPolicyServiceClient autoscalingPolicyServiceClient =
84+
AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) {
85+
86+
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
87+
clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
88+
deleteClusterAsyncRequest.get();
89+
90+
AutoscalingPolicyName name =
91+
AutoscalingPolicyName.ofProjectLocationAutoscalingPolicyName(
92+
PROJECT_ID, REGION, AUTOSCALING_POLICY_NAME);
93+
autoscalingPolicyServiceClient.deleteAutoscalingPolicy(name);
94+
}
95+
}
96+
97+
@Test
98+
public void createClusterWithAutoscalingTest() throws IOException, InterruptedException {
99+
CreateClusterWithAutoscaling.createClusterwithAutoscaling(
100+
PROJECT_ID, REGION, CLUSTER_NAME, AUTOSCALING_POLICY_NAME);
101+
String output = bout.toString();
102+
103+
assertThat(output, CoreMatchers.containsString(CLUSTER_NAME));
104+
}
105+
}

dataproc/src/test/java/InstantiateInlineWorkflowTemplateTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
@RunWith(JUnit4.class)
3131
public class InstantiateInlineWorkflowTemplateTest {
3232

33-
3433
private static final String REGION = "us-central1";
3534
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
3635

0 commit comments

Comments
 (0)