Skip to content

Commit 7e5b133

Browse files
chenxyEthanShouhanCheng
authored andcommitted
feature: Add EMRStep support in Sagemaker pipeline
1 parent a71105b commit 7e5b133

File tree

4 files changed

+2
-95
lines changed

4 files changed

+2
-95
lines changed

src/sagemaker/workflow/emr_step.py

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,7 @@ def __init__(
6868
name: str,
6969
display_name: str,
7070
description: str,
71-
<<<<<<< HEAD
72-
job_flow_id: str,
73-
=======
7471
cluster_id: str,
75-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
7672
step_config: EMRStepConfig,
7773
depends_on: List[str] = None,
7874
cache_config: CacheConfig = None,
@@ -83,11 +79,7 @@ def __init__(
8379
name(str): The name of the EMR step.
8480
display_name(str): The display name of the EMR step.
8581
description(str): The description of the EMR step.
86-
<<<<<<< HEAD
87-
job_flow_id(str): A string that uniquely identifies the job flow(cluster).
88-
=======
8982
cluster_id(str): A string that uniquely identifies the cluster.
90-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
9183
step_config(EMRStepConfig): One StepConfig to be executed by the job flow.
9284
depends_on(List[str]):
9385
A list of step names this `sagemaker.workflow.steps.EMRStep` depends on
@@ -96,23 +88,13 @@ def __init__(
9688
"""
9789
super(EMRStep, self).__init__(name, display_name, description, StepTypeEnum.EMR, depends_on)
9890

99-
<<<<<<< HEAD
100-
emr_step_args = {"JobFlowId": job_flow_id, "StepConfig": step_config.to_request()}
101-
self.args = emr_step_args
102-
self.cache_config = cache_config
103-
104-
self._properties = Properties(
105-
path=f"Steps.{name}", external_service_name="emr", shape_name="DescribeStepOutput"
106-
)
107-
=======
10891
emr_step_args = {"ClusterId": cluster_id, "StepConfig": step_config.to_request()}
10992
self.args = emr_step_args
11093
self.cache_config = cache_config
11194

11295
root_property = Properties(path=f"Steps.{name}", shape_name="Step", service_name="emr")
11396
root_property.__dict__["ClusterId"] = cluster_id
11497
self._properties = root_property
115-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
11698

11799
@property
118100
def arguments(self) -> RequestType:

src/sagemaker/workflow/properties.py

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -25,32 +25,22 @@
2525
class PropertiesMeta(type):
2626
"""Load an internal shapes attribute from the botocore service model
2727
28-
<<<<<<< HEAD
29-
_shapes = None
30-
_emr_shapes = None
31-
=======
3228
for sagemaker and emr service.
3329
"""
3430

3531
_shapes_map = dict()
36-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
3732
_primitive_types = {"string", "boolean", "integer", "float"}
3833

3934
def __new__(mcs, *args, **kwargs):
4035
"""Loads up the shapes from the botocore sagemaker service model."""
4136
if len(mcs._shapes_map.keys()) == 0:
4237
loader = botocore.loaders.Loader()
43-
<<<<<<< HEAD
44-
model = loader.load_service_model("sagemaker", "service-2")
45-
emr_model = loader.load_service_model("emr", "service-2")
46-
mcs._shapes = model["shapes"]
47-
mcs._emr_shapes = emr_model["shapes"]
48-
=======
38+
4939
sagemaker_model = loader.load_service_model("sagemaker", "service-2")
5040
emr_model = loader.load_service_model("emr", "service-2")
5141
mcs._shapes_map["sagemaker"] = sagemaker_model["shapes"]
5242
mcs._shapes_map["emr"] = emr_model["shapes"]
53-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
43+
5444
return super().__new__(mcs, *args, **kwargs)
5545

5646

@@ -62,11 +52,7 @@ def __init__(
6252
path: str,
6353
shape_name: str = None,
6454
shape_names: List[str] = None,
65-
<<<<<<< HEAD
66-
external_service_name: str = None,
67-
=======
6855
service_name: str = "sagemaker",
69-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
7056
):
7157
"""Create a Properties instance representing the given shape.
7258
@@ -79,13 +65,7 @@ def __init__(
7965
shape_names = [] if shape_names is None else shape_names
8066
self._shape_names = shape_names if shape_name is None else [shape_name] + shape_names
8167

82-
<<<<<<< HEAD
83-
shapes = Properties._shapes
84-
if external_service_name == "emr":
85-
shapes = Properties._emr_shapes
86-
=======
8768
shapes = Properties._shapes_map.get(service_name, {})
88-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
8969

9070
for name in self._shape_names:
9171
shape = shapes.get(name, {})

tests/unit/sagemaker/workflow/test_emr_step.py

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@
1919
from mock import Mock
2020

2121
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig
22-
<<<<<<< HEAD
23-
from sagemaker.workflow.properties import Properties
24-
=======
25-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
2622
from sagemaker.workflow.steps import CacheConfig
2723
from sagemaker.workflow.pipeline import Pipeline
2824
from sagemaker.workflow.parameters import ParameterString
@@ -53,11 +49,7 @@ def test_emr_step_with_one_step_config(sagemaker_session):
5349
name="MyEMRStep",
5450
display_name="MyEMRStep",
5551
description="MyEMRStepDescription",
56-
<<<<<<< HEAD
57-
job_flow_id="MyClusterID",
58-
=======
5952
cluster_id="MyClusterID",
60-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
6153
step_config=emr_step_config,
6254
depends_on=["TestStep"],
6355
cache_config=CacheConfig(enable_caching=True, expire_after="PT1H"),
@@ -67,11 +59,7 @@ def test_emr_step_with_one_step_config(sagemaker_session):
6759
"Name": "MyEMRStep",
6860
"Type": "EMR",
6961
"Arguments": {
70-
<<<<<<< HEAD
71-
"JobFlowId": "MyClusterID",
72-
=======
7362
"ClusterId": "MyClusterID",
74-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
7563
"StepConfig": {
7664
"HadoopJarStep": {
7765
"Args": ["--arg_0", "arg_0_value"],
@@ -89,12 +77,7 @@ def test_emr_step_with_one_step_config(sagemaker_session):
8977
"Description": "MyEMRStepDescription",
9078
"CacheConfig": {"Enabled": True, "ExpireAfter": "PT1H"},
9179
}
92-
<<<<<<< HEAD
93-
assert emr_step.properties.Step.expr == {"Get": "Steps.MyEMRStep.Step"}
9480

95-
96-
def test_pipeline_interpolates_lambda_outputs(sagemaker_session):
97-
=======
9881
assert emr_step.properties.ClusterId == "MyClusterID"
9982
assert emr_step.properties.ActionOnFailure.expr == {"Get": "Steps.MyEMRStep.ActionOnFailure"}
10083
assert emr_step.properties.Config.Args.expr == {"Get": "Steps.MyEMRStep.Config.Args"}
@@ -109,7 +92,6 @@ def test_pipeline_interpolates_lambda_outputs(sagemaker_session):
10992

11093

11194
def test_pipeline_interpolates_emr_outputs(sagemaker_session):
112-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
11395
parameter = ParameterString("MyStr")
11496

11597
emr_step_config_1 = EMRStepConfig(
@@ -121,11 +103,7 @@ def test_pipeline_interpolates_emr_outputs(sagemaker_session):
121103

122104
step_emr_1 = EMRStep(
123105
name="emr_step_1",
124-
<<<<<<< HEAD
125-
job_flow_id="MyClusterID",
126-
=======
127106
cluster_id="MyClusterID",
128-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
129107
display_name="emr_step_1",
130108
description="MyEMRStepDescription",
131109
depends_on=["TestStep"],
@@ -136,11 +114,7 @@ def test_pipeline_interpolates_emr_outputs(sagemaker_session):
136114

137115
step_emr_2 = EMRStep(
138116
name="emr_step_2",
139-
<<<<<<< HEAD
140-
job_flow_id="MyClusterID",
141-
=======
142117
cluster_id="MyClusterID",
143-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
144118
display_name="emr_step_2",
145119
description="MyEMRStepDescription",
146120
depends_on=["TestStep"],
@@ -167,11 +141,7 @@ def test_pipeline_interpolates_emr_outputs(sagemaker_session):
167141
"Name": "emr_step_1",
168142
"Type": "EMR",
169143
"Arguments": {
170-
<<<<<<< HEAD
171-
"JobFlowId": "MyClusterID",
172-
=======
173144
"ClusterId": "MyClusterID",
174-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
175145
"StepConfig": {
176146
"HadoopJarStep": {
177147
"Args": ["--arg_0", "arg_0_value"],
@@ -192,11 +162,7 @@ def test_pipeline_interpolates_emr_outputs(sagemaker_session):
192162
"Name": "emr_step_2",
193163
"Type": "EMR",
194164
"Arguments": {
195-
<<<<<<< HEAD
196-
"JobFlowId": "MyClusterID",
197-
=======
198165
"ClusterId": "MyClusterID",
199-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
200166
"StepConfig": {
201167
"HadoopJarStep": {"Jar": "s3:/script-runner/script-runner_2.jar"}
202168
},
@@ -207,15 +173,3 @@ def test_pipeline_interpolates_emr_outputs(sagemaker_session):
207173
},
208174
],
209175
}
210-
<<<<<<< HEAD
211-
212-
213-
def test_properties_describe_step_output():
214-
prop = Properties("Steps.MyStep", "DescribeStepOutput", external_service_name="emr")
215-
some_prop_names = ["Step"]
216-
print(prop.__dict__.keys())
217-
for name in some_prop_names:
218-
assert name in prop.__dict__.keys()
219-
assert prop.Step.expr == {"Get": "Steps.MyStep.Step"}
220-
=======
221-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline

tests/unit/sagemaker/workflow/test_properties.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,14 +71,6 @@ def test_properties_tuning_job():
7171

7272

7373
def test_properties_emr_step():
74-
<<<<<<< HEAD
75-
prop = Properties("Steps.MyStep", "DescribeStepOutput", external_service_name="emr")
76-
some_prop_names = ["Step"]
77-
for name in some_prop_names:
78-
assert name in prop.__dict__.keys()
79-
80-
assert prop.Step.expr == {"Get": "Steps.MyStep.Step"}
81-
=======
8274
prop = Properties("Steps.MyStep", "Step", service_name="emr")
8375
some_prop_names = ["Id", "Name", "Config", "ActionOnFailure", "Status"]
8476
for name in some_prop_names:
@@ -89,7 +81,6 @@ def test_properties_emr_step():
8981
assert prop.ActionOnFailure.expr == {"Get": "Steps.MyStep.ActionOnFailure"}
9082
assert prop.Config.Jar.expr == {"Get": "Steps.MyStep.Config.Jar"}
9183
assert prop.Status.State.expr == {"Get": "Steps.MyStep.Status.State"}
92-
>>>>>>> feature: Add EMRStep support in Sagemaker pipeline
9384

9485

9586
def test_properties_describe_model_package_output():

0 commit comments

Comments
 (0)