Skip to content

Commit e7ddc78

Browse files
committed
Added autoscaling test and small fixes
1 parent 851bc2e commit e7ddc78

File tree

3 files changed

+202
-99
lines changed

3 files changed

+202
-99
lines changed

dataproc/src/main/java/CreateClusterWithAutoscaling.java

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,38 @@
11
/*
2-
* Copyright 2020 Google LLC
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*
16-
* This sample creates a Dataproc cluster with an autoscaling policy enabled. The policy we will be creating mirrors
17-
* the following YAML representation:
18-
*
19-
workerConfig:
20-
minInstances: 2
21-
maxInstances: 100
22-
weight: 1
23-
secondaryWorkerConfig:
24-
minInstances: 0
25-
maxInstances: 100
26-
weight: 1
27-
basicAlgorithm:
28-
cooldownPeriod: 4m
29-
yarnConfig:
30-
scaleUpFactor: 0.05
31-
scaleDownFactor: 1.0
32-
scaleUpMinWorkerFraction: 0.0
33-
scaleDownMinWorkerFraction: 0.0
34-
gracefulDecommissionTimeout: 1h
35-
*/
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
* This sample creates a Dataproc cluster with an autoscaling policy enabled. The policy we will be creating mirrors
17+
* the following YAML representation:
18+
*
19+
workerConfig:
20+
minInstances: 2
21+
maxInstances: 100
22+
weight: 1
23+
secondaryWorkerConfig:
24+
minInstances: 0
25+
maxInstances: 100
26+
weight: 1
27+
basicAlgorithm:
28+
cooldownPeriod: 4m
29+
yarnConfig:
30+
scaleUpFactor: 0.05
31+
scaleDownFactor: 1.0
32+
scaleUpMinWorkerFraction: 0.0
33+
scaleDownMinWorkerFraction: 0.0
34+
gracefulDecommissionTimeout: 1h
35+
*/
3636

3737
// [START dataproc_create_autoscaling_cluster]
3838

@@ -67,9 +67,11 @@ public static void createClusterwithAutoscaling(
6767
AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings =
6868
AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
6969

70-
// Create a cluster controller client and an autoscaling controller clien with the configured
71-
// settings. The clients only need to be created once and can be reused for multiple requests. Using a
72-
// try-with-resources closes the client, but this can also be done manually with the .close() method.
70+
// Create a cluster controller client and an autoscaling controller client with the configured
71+
// settings. The clients only need to be created once and can be reused for multiple requests.
72+
// Using a
73+
// try-with-resources closes the client, but this can also be done manually with the .close()
74+
// method.
7375
try (ClusterControllerClient clusterControllerClient =
7476
ClusterControllerClient.create(clusterControllerSettings);
7577
AutoscalingPolicyServiceClient autoscalingPolicyServiceClient =
@@ -113,10 +115,11 @@ public static void createClusterwithAutoscaling(
113115
// Policy is uploaded here.
114116
autoscalingPolicyServiceClient.createAutoscalingPolicy(parent, autoscalingPolicy);
115117

116-
// Now the policy can be referrenced when creating a cluster.
118+
// Now the policy can be referenced when creating a cluster.
117119
String autoscalingPolicyURI =
118120
String.format(
119-
"projects/%s/locations/%s/autoscalingPolicies/%s", projectId, region, autoscalingPolicyName);
121+
"projects/%s/locations/%s/autoscalingPolicies/%s",
122+
projectId, region, autoscalingPolicyName);
120123
AutoscalingConfig autoscalingConfig =
121124
AutoscalingConfig.newBuilder().setPolicyUri(autoscalingPolicyURI).build();
122125

@@ -151,7 +154,7 @@ public static void createClusterwithAutoscaling(
151154
System.out.printf("Cluster created successfully: %s", response.getClusterName());
152155

153156
} catch (ExecutionException e) {
154-
System.err.println(String.format("Error executing createCluster: %s ", e.getMessage()));
157+
System.err.println(String.format("createClusterWithAutoscaling: %s ", e.getMessage()));
155158
}
156159
}
157160
}

dataproc/src/main/java/SubmitHadoopFSJob.java

Lines changed: 61 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -30,64 +30,66 @@
3030

3131
public class SubmitHadoopFSJob {
3232

33-
public static ArrayList<String> stringToList(String s) {
34-
return new ArrayList<>(Arrays.asList(s.split(" ")));
35-
}
36-
37-
public static void submitHadoopFSQuery() throws IOException, InterruptedException {
38-
// TODO(developer): Replace these variables before running the sample.
39-
String projectId = "your-project-id";
40-
String region = "your-project-region";
41-
String clusterName = "your-cluster-name";
42-
String hadoopFSQuery = "your-hadoop-fs-query";
43-
submitHadoopFSJob(projectId, region, clusterName, hadoopFSQuery);
44-
}
45-
46-
public static void submitHadoopFSJob(
47-
String projectId, String region, String clusterName, String hadoopFSQuery)
48-
throws IOException, InterruptedException {
49-
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
50-
51-
// Configure the settings for the job controller client.
52-
JobControllerSettings jobControllerSettings =
53-
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
54-
55-
// Create a job controller client with the configured settings. Using a try-with-resources closes the client,
56-
// but this can also be done manually with the .close() method.
57-
try (JobControllerClient jobControllerClient =
58-
JobControllerClient.create(jobControllerSettings)) {
59-
60-
// Configure cluster placement for the job.
61-
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
62-
63-
// Configure Hadoop job settings. The HadoopFS query is set here.
64-
HadoopJob hadoopJob = HadoopJob.newBuilder()
65-
.setMainClass("org.apache.hadoop.fs.FsShell")
66-
.addAllArgs(stringToList(hadoopFSQuery))
67-
.build();
68-
69-
Job job = Job.newBuilder().setPlacement(jobPlacement).setHadoopJob(hadoopJob).build();
70-
71-
// Submit an asynchronous request to execute the job.
72-
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
73-
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
74-
75-
Job response = submitJobAsOperationAsyncRequest.get();
76-
77-
// Print output from Google Cloud Storage
78-
Matcher matches = Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
79-
matches.matches();
80-
81-
Storage storage = StorageOptions.getDefaultInstance().getService();
82-
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
83-
84-
System.out.println(String.format("Job \"%s\" finished: %s",
85-
response.getReference().getJobId(),
86-
new String(blob.getContent())));
87-
88-
} catch (ExecutionException e) {
89-
System.err.println(String.format("submitHadoopFSJob: %s ", e.getMessage()));
90-
}
33+
public static ArrayList<String> stringToList(String s) {
34+
return new ArrayList<>(Arrays.asList(s.split(" ")));
35+
}
36+
37+
public static void submitHadoopFSQuery() throws IOException, InterruptedException {
38+
// TODO(developer): Replace these variables before running the sample.
39+
String projectId = "your-project-id";
40+
String region = "your-project-region";
41+
String clusterName = "your-cluster-name";
42+
String hadoopFSQuery = "your-hadoop-fs-query";
43+
submitHadoopFSJob(projectId, region, clusterName, hadoopFSQuery);
44+
}
45+
46+
public static void submitHadoopFSJob(
47+
String projectId, String region, String clusterName, String hadoopFSQuery)
48+
throws IOException, InterruptedException {
49+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", region);
50+
51+
// Configure the settings for the job controller client.
52+
JobControllerSettings jobControllerSettings =
53+
JobControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
54+
55+
// Create a job controller client with the configured settings. Using a try-with-resources
56+
// closes the client,
57+
// but this can also be done manually with the .close() method.
58+
try (JobControllerClient jobControllerClient =
59+
JobControllerClient.create(jobControllerSettings)) {
60+
61+
// Configure cluster placement for the job.
62+
JobPlacement jobPlacement = JobPlacement.newBuilder().setClusterName(clusterName).build();
63+
64+
// Configure Hadoop job settings. The HadoopFS query is set here.
65+
HadoopJob hadoopJob =
66+
HadoopJob.newBuilder()
67+
.setMainClass("org.apache.hadoop.fs.FsShell")
68+
.addAllArgs(stringToList(hadoopFSQuery))
69+
.build();
70+
71+
Job job = Job.newBuilder().setPlacement(jobPlacement).setHadoopJob(hadoopJob).build();
72+
73+
// Submit an asynchronous request to execute the job.
74+
OperationFuture<Job, JobMetadata> submitJobAsOperationAsyncRequest =
75+
jobControllerClient.submitJobAsOperationAsync(projectId, region, job);
76+
77+
Job response = submitJobAsOperationAsyncRequest.get();
78+
79+
// Print output from Google Cloud Storage
80+
Matcher matches =
81+
Pattern.compile("gs://(.*?)/(.*)").matcher(response.getDriverOutputResourceUri());
82+
matches.matches();
83+
84+
Storage storage = StorageOptions.getDefaultInstance().getService();
85+
Blob blob = storage.get(matches.group(1), String.format("%s.000000000", matches.group(2)));
86+
87+
System.out.println(
88+
String.format("Job finished successfully: %s", new String(blob.getContent())));
89+
90+
} catch (ExecutionException e) {
91+
System.err.println(String.format("submitHadoopFSJob: %s ", e.getMessage()));
9192
}
93+
}
9294
}
93-
// [END dataproc_submit_hadoop_fs_job]
95+
// [END dataproc_submit_hadoop_fs_job]
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,100 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import static junit.framework.TestCase.assertNotNull;
18+
import static org.hamcrest.MatcherAssert.assertThat;
19+
20+
import com.google.api.gax.longrunning.OperationFuture;
21+
import com.google.cloud.dataproc.v1.*;
22+
import com.google.protobuf.Empty;
23+
import java.io.ByteArrayOutputStream;
24+
import java.io.IOException;
25+
import java.io.PrintStream;
26+
import java.util.UUID;
27+
import java.util.concurrent.ExecutionException;
28+
import org.hamcrest.CoreMatchers;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.BeforeClass;
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.junit.runners.JUnit4;
35+
36+
@RunWith(JUnit4.class)
137
public class CreateClusterWithAutoscalingTest {
38+
39+
private static final String CLUSTER_NAME =
40+
String.format("java-as-test-%s", UUID.randomUUID().toString());
41+
private static final String REGION = "us-central1";
42+
private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
43+
private static final String AUTOSCALING_POLICY_NAME =
44+
String.format("java-as-test-%s", UUID.randomUUID().toString());
45+
46+
private ByteArrayOutputStream bout;
47+
48+
private static void requireEnv(String varName) {
49+
assertNotNull(
50+
String.format("Environment variable '%s' is required to perform these tests.", varName),
51+
System.getenv(varName));
52+
}
53+
54+
@BeforeClass
55+
public static void checkRequirements() {
56+
requireEnv("GOOGLE_APPLICATION_CREDENTIALS");
57+
requireEnv("GOOGLE_CLOUD_PROJECT");
58+
}
59+
60+
@Before
61+
public void setUp() {
62+
bout = new ByteArrayOutputStream();
63+
System.setOut(new PrintStream(bout));
64+
}
65+
66+
@After
67+
public void tearDown() throws IOException, InterruptedException, ExecutionException {
68+
String myEndpoint = String.format("%s-dataproc.googleapis.com:443", REGION);
69+
70+
ClusterControllerSettings clusterControllerSettings =
71+
ClusterControllerSettings.newBuilder().setEndpoint(myEndpoint).build();
72+
73+
AutoscalingPolicyServiceSettings autoscalingPolicyServiceSettings =
74+
AutoscalingPolicyServiceSettings.newBuilder().setEndpoint(myEndpoint).build();
75+
76+
try (ClusterControllerClient clusterControllerClient =
77+
ClusterControllerClient.create(clusterControllerSettings);
78+
AutoscalingPolicyServiceClient autoscalingPolicyServiceClient =
79+
AutoscalingPolicyServiceClient.create(autoscalingPolicyServiceSettings)) {
80+
81+
OperationFuture<Empty, ClusterOperationMetadata> deleteClusterAsyncRequest =
82+
clusterControllerClient.deleteClusterAsync(PROJECT_ID, REGION, CLUSTER_NAME);
83+
deleteClusterAsyncRequest.get();
84+
85+
AutoscalingPolicyName name =
86+
AutoscalingPolicyName.ofProjectLocationAutoscalingPolicyName(
87+
PROJECT_ID, REGION, AUTOSCALING_POLICY_NAME);
88+
autoscalingPolicyServiceClient.deleteAutoscalingPolicy(name);
89+
}
90+
}
91+
92+
@Test
93+
public void createClusterWithAutoscalingTest() throws IOException, InterruptedException {
94+
CreateClusterWithAutoscaling.createClusterwithAutoscaling(
95+
PROJECT_ID, REGION, CLUSTER_NAME, AUTOSCALING_POLICY_NAME);
96+
String output = bout.toString();
97+
98+
assertThat(output, CoreMatchers.containsString(CLUSTER_NAME));
99+
}
2100
}

0 commit comments

Comments
 (0)