@@ -61,62 +61,89 @@ def to_request(self) -> RequestType:
61
61
return config
62
62
63
63
64
- def validate_cluster_config (cluster_config , step_name ):
65
- """Validates user provided cluster_config.
66
-
67
- Args:
68
- cluster_config(Union[Dict[str, Any], List[Dict[str, Any]]]):
69
- user provided cluster configuration.
70
- step_name: The name of the EMR step.
71
- """
72
-
73
- instances = "Instances"
74
- instancegroups = "InstanceGroups"
75
- instancefleets = "InstanceFleets"
76
- prefix_with_in = "In EMRStep " + step_name + ", "
77
-
78
- if (
79
- "Name" in cluster_config
80
- or "AutoTerminationPolicy" in cluster_config
81
- or "Steps" in cluster_config
82
- ):
83
- raise Exception (
84
- prefix_with_in + "cluster_config should not contain any of Name, "
85
- "AutoTerminationPolicy and/or Steps"
86
- )
64
+ instances = "Instances"
65
+ instancegroups = "InstanceGroups"
66
+ instancefleets = "InstanceFleets"
67
+ err_str_with_name_auto_termination_or_steps = (
68
+ "In EMRStep {step_name}, cluster_config "
69
+ "should not contain any of the Name, "
70
+ "AutoTerminationPolicy and/or Steps."
71
+ )
87
72
88
- if instances not in cluster_config :
89
- raise Exception (prefix_with_in + "cluster_config must contain Instances" )
73
+ err_str_without_instance = "In EMRStep {step_name}, cluster_config must contain " + instances + "."
90
74
91
- if (
92
- "KeepJobFlowAliveWhenNoSteps" in cluster_config [instances ]
93
- or "TerminationProtected" in cluster_config [instances ]
94
- ):
95
- raise Exception (
96
- prefix_with_in + instances + " should not contain "
97
- "KeepJobFlowAliveWhenNoSteps or "
98
- "TerminationProtected"
99
- )
100
-
101
- if (
102
- instancegroups in cluster_config [instances ] and instancefleets in cluster_config [instances ]
103
- ) or (
104
- instancegroups not in cluster_config [instances ]
105
- and instancefleets not in cluster_config [instances ]
106
- ):
107
- raise Exception (
108
- prefix_with_in
109
- + instances
110
- + " should contain either "
111
- + instancegroups
112
- + " or "
113
- + instancefleets
114
- )
75
+ err_str_with_keepjobflow_or_terminationprotected = (
76
+ "In EMRStep {step_name}, " + instances + " should not contain "
77
+ "KeepJobFlowAliveWhenNoSteps or "
78
+ "TerminationProtected."
79
+ )
80
+
81
+ err_str_both_or_none_instancegroups_or_instancefleets = (
82
+ "In EMRStep {step_name}, "
83
+ + instances
84
+ + " should contain either "
85
+ + instancegroups
86
+ + " or "
87
+ + instancefleets
88
+ + "."
89
+ )
90
+
91
+ err_str_with_both_cluster_id_and_cluster_cfg = (
92
+ "EMRStep {step_name} can not have both cluster_id"
93
+ "or cluster_config."
94
+ "To use EMRStep with "
95
+ "cluster_config, cluster_id "
96
+ "must be explicitly set to None."
97
+ )
98
+
99
+ err_str_without_cluster_id_and_cluster_cfg = (
100
+ "EMRStep {step_name} must have either cluster_id or cluster_config"
101
+ )
115
102
116
103
117
104
class EMRStep (Step ):
118
105
"""EMR step for workflow."""
119
106
107
+ def _validate_cluster_config (self , cluster_config , step_name ):
108
+ """Validates user provided cluster_config.
109
+
110
+ Args:
111
+ cluster_config(Union[Dict[str, Any], List[Dict[str, Any]]]):
112
+ user provided cluster configuration.
113
+ step_name: The name of the EMR step.
114
+ """
115
+
116
+ if (
117
+ "Name" in cluster_config
118
+ or "AutoTerminationPolicy" in cluster_config
119
+ or "Steps" in cluster_config
120
+ ):
121
+ raise ValueError (
122
+ err_str_with_name_auto_termination_or_steps .format (step_name = step_name )
123
+ )
124
+
125
+ if instances not in cluster_config :
126
+ raise ValueError (err_str_without_instance .format (step_name = step_name ))
127
+
128
+ if (
129
+ "KeepJobFlowAliveWhenNoSteps" in cluster_config [instances ]
130
+ or "TerminationProtected" in cluster_config [instances ]
131
+ ):
132
+ raise ValueError (
133
+ err_str_with_keepjobflow_or_terminationprotected .format (step_name = step_name )
134
+ )
135
+
136
+ if (
137
+ instancegroups in cluster_config [instances ]
138
+ and instancefleets in cluster_config [instances ]
139
+ ) or (
140
+ instancegroups not in cluster_config [instances ]
141
+ and instancefleets not in cluster_config [instances ]
142
+ ):
143
+ raise ValueError (
144
+ err_str_both_or_none_instancegroups_or_instancefleets .format (step_name = step_name )
145
+ )
146
+
120
147
def __init__ (
121
148
self ,
122
149
name : str ,
@@ -128,7 +155,7 @@ def __init__(
128
155
cache_config : CacheConfig = None ,
129
156
cluster_config : RequestType = None ,
130
157
):
131
- """Constructs a EMRStep.
158
+ """Constructs an EMRStep.
132
159
133
160
Args:
134
161
name(str): The name of the EMR step.
@@ -141,40 +168,36 @@ def __init__(
141
168
depends on.
142
169
cache_config(CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
143
170
cluster_config(Union[Dict[str, Any], List[Dict[str, Any]]]): The recipe of the
144
- EMR Cluster. It is a dictionary.
145
- The elements are defined in the Request Syntax Section:
171
+ EMR cluster, passed as a dictionary. The elements are defined in the request syntax
172
+ for RunJobFlow. However, the following elements are not recognized as part of the
173
+ cluster configuration and you should not include them in the dictionary:
174
+ 1. cluster_config[Name]
175
+ 2. cluster_config[Steps]
176
+ 3. cluster_config[AutoTerminationPolicy]
177
+ 4. cluster_config[Instances][KeepJobFlowAliveWhenNoSteps]
178
+ 5. cluster_config[Instances][TerminationProtected]
179
+ For more information about the fields you can include in your cluster
180
+ configuration, see
146
181
https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html
147
- However, the following five elements are restricted, and must not present
148
- in the dictionary:
149
- 1. cluster_config[Name]
150
- 2. cluster_config[Steps]
151
- 3. cluster_config[AutoTerminationPolicy]
152
- 4. cluster_config[Instances][KeepJobFlowAliveWhenNoSteps]
153
- 5. cluster_config[Instances][TerminationProtected]
154
- Note that, if user wants to use cluster_config, then they have to explicitly set
155
- cluster_id as None
156
-
182
+ Note that if you want to use cluster_config,
183
+ then you have to set cluster_id as None.
157
184
"""
158
185
super (EMRStep , self ).__init__ (name , display_name , description , StepTypeEnum .EMR , depends_on )
159
186
160
187
emr_step_args = {"StepConfig" : step_config .to_request ()}
161
188
root_property = Properties (step_name = name , shape_name = "Step" , service_name = "emr" )
162
189
163
190
if cluster_id is None and cluster_config is None :
164
- raise Exception ( "EMRStep " + name + " must have either cluster_id or cluster_config" )
191
+ raise ValueError ( err_str_without_cluster_id_and_cluster_cfg . format ( step_name = name ) )
165
192
166
193
if cluster_id is not None and cluster_config is not None :
167
- raise Exception (
168
- "EMRStep " + name + " can not have both cluster_id or cluster_config. "
169
- "If user wants to use cluster_config, then they "
170
- "have to explicitly set cluster_id as None"
171
- )
194
+ raise ValueError (err_str_with_both_cluster_id_and_cluster_cfg .format (step_name = name ))
172
195
173
196
if cluster_id is not None :
174
197
emr_step_args ["ClusterId" ] = cluster_id
175
198
root_property .__dict__ ["ClusterId" ] = cluster_id
176
199
elif cluster_config is not None :
177
- validate_cluster_config (cluster_config , name )
200
+ self . _validate_cluster_config (cluster_config , name )
178
201
emr_step_args ["ClusterConfig" ] = cluster_config
179
202
root_property .__dict__ ["ClusterConfig" ] = cluster_config
180
203
0 commit comments