Skip to content

Commit 43979e8

Browse files
authored
Merge branch 'master' into fix/missing-jumpstart-estimator-args
2 parents 9202f5b + a8b6b1d commit 43979e8

28 files changed

+561
-49
lines changed

doc/workflows/pipelines/sagemaker.workflow.pipelines.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ Parallelism Configuration
103103
.. autoclass:: sagemaker.workflow.parallelism_config.ParallelismConfiguration
104104
:members:
105105

106+
Pipeline Definition Config
107+
--------------------------
108+
109+
.. autoclass:: sagemaker.workflow.pipeline_definition_config.PipelineDefinitionConfig
110+
106111
Pipeline Experiment Config
107112
--------------------------
108113

src/sagemaker/image_uri_config/djl-deepspeed.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"us-west-2": "763104351884"
3131
},
3232
"repository": "djl-inference",
33-
"tag_prefix": "0.22.1-deepspeed0.8.3-cu118"
33+
"tag_prefix": "0.22.1-deepspeed0.9.2-cu118"
3434
},
3535
"0.21.0": {
3636
"registries": {

src/sagemaker/model_monitor/clarify_model_monitoring.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@ def _create_baselining_processor(self):
175175
network_config=self.network_config,
176176
)
177177
baselining_processor.image_uri = self.image_uri
178+
baselining_processor.base_job_name = self.base_job_name
178179
return baselining_processor
179180

180181
def _upload_analysis_config(self, analysis_config, output_s3_uri, job_definition_name):

src/sagemaker/workflow/_utils.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
)
3535
from sagemaker.utils import _save_model, download_file_from_url
3636
from sagemaker.workflow.retry import RetryPolicy
37+
from sagemaker.workflow.utilities import trim_request_dict
3738

3839
if TYPE_CHECKING:
3940
from sagemaker.workflow.step_collections import StepCollection
@@ -412,6 +413,8 @@ def __init__(
412413
@property
413414
def arguments(self) -> RequestType:
414415
"""The arguments dict that are used to call `create_model_package`."""
416+
from sagemaker.workflow.utilities import _pipeline_config
417+
415418
model_name = self.name
416419

417420
if self.step_args:
@@ -492,9 +495,9 @@ def arguments(self) -> RequestType:
492495
if "Description" in request_dict:
493496
request_dict.pop("Description")
494497
logger.warning(warn_msg_template, "Description")
495-
if "ModelPackageName" in request_dict:
496-
request_dict.pop("ModelPackageName")
497-
logger.warning(warn_msg_template, "ModelPackageName")
498+
499+
# Continue to pop job name if not explicitly opted-in via config
500+
request_dict = trim_request_dict(request_dict, "ModelPackageName", _pipeline_config)
498501

499502
return request_dict
500503

src/sagemaker/workflow/automl_step.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from sagemaker.workflow.properties import Properties
2424
from sagemaker.workflow.retry import RetryPolicy
2525
from sagemaker.workflow.steps import ConfigurableRetryStep, CacheConfig, Step, StepTypeEnum
26-
from sagemaker.workflow.utilities import validate_step_args_input
26+
from sagemaker.workflow.utilities import validate_step_args_input, trim_request_dict
2727
from sagemaker.workflow.step_collections import StepCollection
2828

2929

@@ -89,10 +89,11 @@ def arguments(self) -> RequestType:
8989
NOTE: The `CreateAutoMLJob` request is not quite the
9090
args list that workflow needs.
9191
92-
The `AutoMLJobName`, `ModelDeployConfig` and `GenerateCandidateDefinitionsOnly`
92+
`ModelDeployConfig` and `GenerateCandidateDefinitionsOnly`
9393
attribute cannot be included.
9494
"""
9595
from sagemaker.workflow.utilities import execute_job_functions
96+
from sagemaker.workflow.utilities import _pipeline_config
9697

9798
# execute fit function in AutoML with saved parameters,
9899
# and store args in PipelineSession's _context
@@ -114,7 +115,10 @@ def arguments(self) -> RequestType:
114115
request_dict.pop("ModelDeployConfig", None)
115116
if "GenerateCandidateDefinitionsOnly" in request_dict:
116117
request_dict.pop("GenerateCandidateDefinitionsOnly", None)
117-
request_dict.pop("AutoMLJobName", None)
118+
# Continue to pop job name if not explicitly opted-in via config
119+
# AutoML Trims to AutoMLJo-2023-06-23-22-57-39-083
120+
request_dict = trim_request_dict(request_dict, "AutoMLJobName", _pipeline_config)
121+
118122
return request_dict
119123

120124
@property

src/sagemaker/workflow/clarify_check_step.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
from sagemaker.workflow.step_collections import StepCollection
4545
from sagemaker.workflow.steps import Step, StepTypeEnum, CacheConfig
4646
from sagemaker.workflow.check_job_config import CheckJobConfig
47+
from sagemaker.workflow.utilities import trim_request_dict
4748

4849
_DATA_BIAS_TYPE = "DATA_BIAS"
4950
_MODEL_BIAS_TYPE = "MODEL_BIAS"
@@ -253,6 +254,8 @@ def __init__(
253254
@property
254255
def arguments(self) -> RequestType:
255256
"""The arguments dict that is used to define the ClarifyCheck step."""
257+
from sagemaker.workflow.utilities import _pipeline_config
258+
256259
normalized_inputs, normalized_outputs = self._baselining_processor._normalize_args(
257260
inputs=[self._processing_params["config_input"], self._processing_params["data_input"]],
258261
outputs=[self._processing_params["result_output"]],
@@ -266,8 +269,8 @@ def arguments(self) -> RequestType:
266269
request_dict = self._baselining_processor.sagemaker_session._get_process_request(
267270
**process_args
268271
)
269-
if "ProcessingJobName" in request_dict:
270-
request_dict.pop("ProcessingJobName")
272+
# Continue to pop job name if not explicitly opted-in via config
273+
request_dict = trim_request_dict(request_dict, "ProcessingJobName", _pipeline_config)
271274

272275
return request_dict
273276

src/sagemaker/workflow/pipeline.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
)
3838
from sagemaker.workflow.execution_variables import ExecutionVariables
3939
from sagemaker.workflow.parameters import Parameter
40+
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
4041
from sagemaker.workflow.pipeline_experiment_config import PipelineExperimentConfig
4142
from sagemaker.workflow.parallelism_config import ParallelismConfiguration
4243
from sagemaker.workflow.properties import Properties
@@ -52,6 +53,8 @@
5253
ExecutionVariables.PIPELINE_NAME, ExecutionVariables.PIPELINE_EXECUTION_ID
5354
)
5455

56+
_DEFAULT_DEFINITION_CFG = PipelineDefinitionConfig(use_custom_job_prefix=False)
57+
5558

5659
class Pipeline(Entity):
5760
"""Pipeline for workflow."""
@@ -63,6 +66,7 @@ def __init__(
6366
pipeline_experiment_config: Optional[PipelineExperimentConfig] = _DEFAULT_EXPERIMENT_CFG,
6467
steps: Optional[Sequence[Union[Step, StepCollection]]] = None,
6568
sagemaker_session: Optional[Session] = None,
69+
pipeline_definition_config: Optional[PipelineDefinitionConfig] = _DEFAULT_DEFINITION_CFG,
6670
):
6771
"""Initialize a Pipeline
6872
@@ -84,12 +88,16 @@ def __init__(
8488
sagemaker_session (sagemaker.session.Session): Session object that manages interactions
8589
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
8690
pipeline creates one using the default AWS configuration chain.
91+
pipeline_definition_config (Optional[PipelineDefinitionConfig]): If set,
92+
the workflow customizes the pipeline definition using the configurations
93+
specified. By default, custom job-prefixing is turned off.
8794
"""
8895
self.name = name
8996
self.parameters = parameters if parameters else []
9097
self.pipeline_experiment_config = pipeline_experiment_config
9198
self.steps = steps if steps else []
9299
self.sagemaker_session = sagemaker_session if sagemaker_session else Session()
100+
self.pipeline_definition_config = pipeline_definition_config
93101

94102
self._version = "2020-12-01"
95103
self._metadata = dict()
@@ -105,7 +113,11 @@ def to_request(self) -> RequestType:
105113
"PipelineExperimentConfig": self.pipeline_experiment_config.to_request()
106114
if self.pipeline_experiment_config is not None
107115
else None,
108-
"Steps": build_steps(self.steps, self.name),
116+
"Steps": build_steps(
117+
self.steps,
118+
self.name,
119+
self.pipeline_definition_config,
120+
),
109121
}
110122

111123
def create(

src/sagemaker/workflow/pipeline_context.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from sagemaker.session import Session, SessionSettings
2222
from sagemaker.local import LocalSession
23+
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
2324

2425

2526
class _StepArguments:
@@ -87,13 +88,23 @@ class _PipelineConfig:
8788
step_name (str): step name
8889
code_hash (str): a hash of the code artifact for the particular step
8990
config_hash (str): a hash of the config artifact for the particular step (Processing)
91+
pipeline_definition_config (PipelineDefinitionConfig): a configuration used to toggle
92+
feature flags persistent in a pipeline definition
9093
"""
9194

92-
def __init__(self, pipeline_name, step_name, code_hash, config_hash):
95+
def __init__(
96+
self,
97+
pipeline_name: str,
98+
step_name: str,
99+
code_hash: str,
100+
config_hash: str,
101+
pipeline_definition_config: PipelineDefinitionConfig,
102+
):
93103
self.pipeline_name = pipeline_name
94104
self.step_name = step_name
95105
self.code_hash = code_hash
96106
self.config_hash = config_hash
107+
self.pipeline_definition_config = pipeline_definition_config
97108

98109

99110
class PipelineSession(Session):
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Pipeline experiment config for SageMaker pipeline."""
14+
from __future__ import absolute_import
15+
16+
17+
class PipelineDefinitionConfig:
18+
"""Pipeline Definition Configuration for SageMaker pipeline."""
19+
20+
def __init__(self, use_custom_job_prefix: bool):
21+
"""Create a `PipelineDefinitionConfig`.
22+
23+
Examples: Use a `PipelineDefinitionConfig` to turn on custom job prefixing::
24+
25+
PipelineDefinitionConfig(use_custom_job_prefix=True)
26+
27+
Args:
28+
use_custom_job_prefix (bool): A feature flag to toggle on/off custom name prefixing
29+
during pipeline orchestration.
30+
"""
31+
self.use_custom_job_prefix = use_custom_job_prefix

src/sagemaker/workflow/quality_check_step.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from sagemaker.workflow.step_collections import StepCollection
3232
from sagemaker.workflow.steps import Step, StepTypeEnum, CacheConfig
3333
from sagemaker.workflow.check_job_config import CheckJobConfig
34+
from sagemaker.workflow.utilities import trim_request_dict
3435

3536
_CONTAINER_BASE_PATH = "/opt/ml/processing"
3637
_CONTAINER_INPUT_PATH = "input"
@@ -225,6 +226,8 @@ def __init__(
225226
@property
226227
def arguments(self) -> RequestType:
227228
"""The arguments dict that is used to define the QualityCheck step."""
229+
from sagemaker.workflow.utilities import _pipeline_config
230+
228231
normalized_inputs, normalized_outputs = self._baselining_processor._normalize_args(
229232
inputs=self._baseline_job_inputs,
230233
outputs=[self._baseline_output],
@@ -238,8 +241,8 @@ def arguments(self) -> RequestType:
238241
request_dict = self._baselining_processor.sagemaker_session._get_process_request(
239242
**process_args
240243
)
241-
if "ProcessingJobName" in request_dict:
242-
request_dict.pop("ProcessingJobName")
244+
# Continue to pop job name if not explicitly opted-in via config
245+
request_dict = trim_request_dict(request_dict, "ProcessingJobName", _pipeline_config)
243246

244247
return request_dict
245248

src/sagemaker/workflow/steps.py

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
from sagemaker.workflow.entities import PipelineVariable
5151
from sagemaker.workflow.functions import Join, JsonGet
5252
from sagemaker.workflow.retry import RetryPolicy
53+
from sagemaker.workflow.utilities import trim_request_dict
5354

5455
if TYPE_CHECKING:
5556
from sagemaker.workflow.step_collections import StepCollection
@@ -471,9 +472,10 @@ def arguments(self) -> RequestType:
471472
"""The arguments dictionary that is used to call `create_training_job`.
472473
473474
NOTE: The `CreateTrainingJob` request is not quite the args list that workflow needs.
474-
The `TrainingJobName` and `ExperimentConfig` attributes cannot be included.
475+
`ExperimentConfig` attribute cannot be included.
475476
"""
476477
from sagemaker.workflow.utilities import execute_job_functions
478+
from sagemaker.workflow.utilities import _pipeline_config
477479

478480
if self.step_args:
479481
# execute fit function with saved parameters,
@@ -493,7 +495,9 @@ def arguments(self) -> RequestType:
493495
if "HyperParameters" in request_dict:
494496
request_dict["HyperParameters"].pop("sagemaker_job_name", None)
495497

496-
request_dict.pop("TrainingJobName", None)
498+
# Continue to pop job name if not explicitly opted-in via config
499+
request_dict = trim_request_dict(request_dict, "TrainingJobName", _pipeline_config)
500+
497501
Step._trim_experiment_config(request_dict)
498502

499503
return request_dict
@@ -595,8 +599,8 @@ def arguments(self) -> RequestType:
595599
"""The arguments dictionary that is used to call `create_model`.
596600
597601
NOTE: The `CreateModelRequest` is not quite the args list that workflow needs.
598-
`ModelName` cannot be included in the arguments.
599602
"""
603+
from sagemaker.workflow.utilities import _pipeline_config
600604

601605
if self.step_args:
602606
request_dict = self.step_args
@@ -620,7 +624,9 @@ def arguments(self) -> RequestType:
620624
vpc_config=self.model.vpc_config,
621625
enable_network_isolation=self.model.enable_network_isolation(),
622626
)
623-
request_dict.pop("ModelName", None)
627+
628+
# Continue to pop job name if not explicitly opted-in via config
629+
request_dict = trim_request_dict(request_dict, "ModelName", _pipeline_config)
624630

625631
return request_dict
626632

@@ -702,9 +708,10 @@ def arguments(self) -> RequestType:
702708
"""The arguments dictionary that is used to call `create_transform_job`.
703709
704710
NOTE: The `CreateTransformJob` request is not quite the args list that workflow needs.
705-
`TransformJobName` and `ExperimentConfig` cannot be included in the arguments.
711+
`ExperimentConfig` cannot be included in the arguments.
706712
"""
707713
from sagemaker.workflow.utilities import execute_job_functions
714+
from sagemaker.workflow.utilities import _pipeline_config
708715

709716
if self.step_args:
710717
# execute transform function with saved parameters,
@@ -733,7 +740,9 @@ def arguments(self) -> RequestType:
733740
**transform_args
734741
)
735742

736-
request_dict.pop("TransformJobName", None)
743+
# Continue to pop job name if not explicitly opted-in via config
744+
request_dict = trim_request_dict(request_dict, "TransformJobName", _pipeline_config)
745+
737746
Step._trim_experiment_config(request_dict)
738747

739748
return request_dict
@@ -864,9 +873,10 @@ def arguments(self) -> RequestType:
864873
"""The arguments dictionary that is used to call `create_processing_job`.
865874
866875
NOTE: The `CreateProcessingJob` request is not quite the args list that workflow needs.
867-
`ProcessingJobName` and `ExperimentConfig` cannot be included in the arguments.
876+
`ExperimentConfig` cannot be included in the arguments.
868877
"""
869878
from sagemaker.workflow.utilities import execute_job_functions
879+
from sagemaker.workflow.utilities import _pipeline_config
870880

871881
if self.step_args:
872882
# execute run function with saved parameters,
@@ -890,7 +900,9 @@ def arguments(self) -> RequestType:
890900
)
891901
request_dict = self.processor.sagemaker_session._get_process_request(**process_args)
892902

893-
request_dict.pop("ProcessingJobName", None)
903+
# Continue to pop job name if not explicitly opted-in via config
904+
request_dict = trim_request_dict(request_dict, "ProcessingJobName", _pipeline_config)
905+
894906
Step._trim_experiment_config(request_dict)
895907

896908
return request_dict
@@ -1025,9 +1037,9 @@ def arguments(self) -> RequestType:
10251037
10261038
NOTE: The `CreateHyperParameterTuningJob` request is not quite the
10271039
args list that workflow needs.
1028-
The `HyperParameterTuningJobName` attribute cannot be included.
10291040
"""
10301041
from sagemaker.workflow.utilities import execute_job_functions
1042+
from sagemaker.workflow.utilities import _pipeline_config
10311043

10321044
if self.step_args:
10331045
# execute fit function with saved parameters,
@@ -1048,7 +1060,11 @@ def arguments(self) -> RequestType:
10481060
tuner_args = _TuningJob._get_tuner_args(self.tuner, self.inputs)
10491061
request_dict = self.tuner.sagemaker_session._get_tuning_request(**tuner_args)
10501062

1051-
request_dict.pop("HyperParameterTuningJobName", None)
1063+
# Continue to pop job name if not explicitly opted-in via config
1064+
request_dict = trim_request_dict(
1065+
request_dict, "HyperParameterTuningJobName", _pipeline_config
1066+
)
1067+
10521068
return request_dict
10531069

10541070
@property

0 commit comments

Comments
 (0)