Skip to content

fix: RegisterModel step and custom dependency support #2262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/sagemaker/workflow/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(
entry_point: str,
source_dir: str = None,
dependencies: List = None,
depends_on: List[str] = None,
):
"""Constructs a TrainingStep, given an `EstimatorBase` instance.

Expand Down Expand Up @@ -102,7 +103,9 @@ def __init__(
inputs = TrainingInput(self._model_prefix)

# super!
super(_RepackModelStep, self).__init__(name=name, estimator=repacker, inputs=inputs)
super(_RepackModelStep, self).__init__(
name=name, depends_on=depends_on, estimator=repacker, inputs=inputs
)

def _prepare_for_repacking(self):
"""Prepares the source for the estimator."""
Expand Down Expand Up @@ -221,6 +224,7 @@ def __init__(
image_uri=None,
compile_model_family=None,
description=None,
depends_on: List[str] = None,
**kwargs,
):
"""Constructor of a register model step.
Expand Down Expand Up @@ -248,9 +252,11 @@ def __init__(
compile_model_family (str): Instance family for compiled model, if specified, a compiled
model will be used (default: None).
description (str): Model Package description (default: None).
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep`
depends on
**kwargs: additional arguments to `create_model`.
"""
super(_RegisterModelStep, self).__init__(name, StepTypeEnum.REGISTER_MODEL)
super(_RegisterModelStep, self).__init__(name, StepTypeEnum.REGISTER_MODEL, depends_on)
self.estimator = estimator
self.model_data = model_data
self.content_types = content_types
Expand Down
3 changes: 2 additions & 1 deletion src/sagemaker/workflow/condition_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class ConditionStep(Step):
def __init__(
self,
name: str,
depends_on: List[str] = None,
conditions: List[Condition] = None,
if_steps: List[Union[Step, StepCollection]] = None,
else_steps: List[Union[Step, StepCollection]] = None,
Expand All @@ -60,7 +61,7 @@ def __init__(
and `sagemaker.workflow.step_collections.StepCollection` instances that are
marked as ready for execution if the list of conditions evaluates to False.
"""
super(ConditionStep, self).__init__(name, StepTypeEnum.CONDITION)
super(ConditionStep, self).__init__(name, StepTypeEnum.CONDITION, depends_on)
self.conditions = conditions or []
self.if_steps = if_steps or []
self.else_steps = else_steps or []
Expand Down
21 changes: 21 additions & 0 deletions src/sagemaker/workflow/step_collections.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def __init__(
response_types,
inference_instances,
transform_instances,
depends_on: List[str] = None,
model_package_group_name=None,
model_metrics=None,
approval_status=None,
Expand All @@ -80,6 +81,8 @@ def __init__(
generate inferences in real-time (default: None).
transform_instances (list): A list of the instance types on which a transformation
job can be run or on which an endpoint can be deployed (default: None).
depends_on (List[str]): The list of step names the first step in the collection
depends on
model_package_group_name (str): The Model Package Group name, exclusive to
`model_package_name`, using `model_package_group_name` makes the Model Package
versioned (default: None).
Expand All @@ -94,12 +97,15 @@ def __init__(
**kwargs: additional arguments to `create_model`.
"""
steps: List[Step] = []
repack_model = False
if "entry_point" in kwargs:
repack_model = True
entry_point = kwargs["entry_point"]
source_dir = kwargs.get("source_dir")
dependencies = kwargs.get("dependencies")
repack_model_step = _RepackModelStep(
name=f"{name}RepackModel",
depends_on=depends_on,
estimator=estimator,
model_data=model_data,
entry_point=entry_point,
Expand All @@ -109,6 +115,11 @@ def __init__(
steps.append(repack_model_step)
model_data = repack_model_step.properties.ModelArtifacts.S3ModelArtifacts

# remove kwargs consumed by model repacking step
kwargs.pop("entry_point", None)
kwargs.pop("source_dir", None)
kwargs.pop("dependencies", None)

register_model_step = _RegisterModelStep(
name=name,
estimator=estimator,
Expand All @@ -125,6 +136,9 @@ def __init__(
description=description,
**kwargs,
)
if not repack_model:
register_model_step.add_depends_on(depends_on)

steps.append(register_model_step)
self.steps = steps

Expand Down Expand Up @@ -155,6 +169,7 @@ def __init__(
max_payload=None,
tags=None,
volume_kms_key=None,
depends_on: List[str] = None,
**kwargs,
):
"""Construct steps required for a Transformer step collection:
Expand Down Expand Up @@ -191,6 +206,8 @@ def __init__(
it will be the format of the batch transform output.
env (dict): The Environment variables to be set for use during the
transform job (default: None).
depends_on (List[str]): The list of step names the first step in
the collection depends on
"""
steps = []
if "entry_point" in kwargs:
Expand All @@ -199,6 +216,7 @@ def __init__(
dependencies = kwargs.get("dependencies")
repack_model_step = _RepackModelStep(
name=f"{name}RepackModel",
depends_on=depends_on,
estimator=estimator,
model_data=model_data,
entry_point=entry_point,
Expand Down Expand Up @@ -227,6 +245,9 @@ def predict_wrapper(endpoint, session):
model=model,
inputs=model_inputs,
)
if "entry_point" not in kwargs and depends_on:
# if the CreateModelStep is the first step in the collection
model_step.add_depends_on(depends_on)
steps.append(model_step)

transformer = Transformer(
Expand Down
39 changes: 30 additions & 9 deletions src/sagemaker/workflow/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,12 @@ class Step(Entity):
Attributes:
name (str): The name of the step.
step_type (StepTypeEnum): The type of the step.
depends_on (List[str]): The list of step names the current step depends on
"""

name: str = attr.ib(factory=str)
step_type: StepTypeEnum = attr.ib(factory=StepTypeEnum.factory)
depends_on: List[str] = attr.ib(default=None)

@property
@abc.abstractmethod
Expand All @@ -80,11 +82,22 @@ def properties(self):

def to_request(self) -> RequestType:
"""Gets the request structure for workflow service calls."""
return {
request_dict = {
"Name": self.name,
"Type": self.step_type.value,
"Arguments": self.arguments,
}
if self.depends_on:
request_dict["DependsOn"] = self.depends_on
return request_dict

def add_depends_on(self, step_names: List[str]):
"""Add step names to the current step depends on list"""
if not step_names:
return
if not self.depends_on:
self.depends_on = []
self.depends_on.extend(step_names)

@property
def ref(self) -> Dict[str, str]:
Expand Down Expand Up @@ -133,6 +146,7 @@ def __init__(
estimator: EstimatorBase,
inputs: TrainingInput = None,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
):
"""Construct a TrainingStep, given an `EstimatorBase` instance.

Expand All @@ -144,8 +158,10 @@ def __init__(
estimator (EstimatorBase): A `sagemaker.estimator.EstimatorBase` instance.
inputs (TrainingInput): A `sagemaker.inputs.TrainingInput` instance. Defaults to `None`.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TrainingStep`
depends on
"""
super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING)
super(TrainingStep, self).__init__(name, StepTypeEnum.TRAINING, depends_on)
self.estimator = estimator
self.inputs = inputs
self._properties = Properties(
Expand Down Expand Up @@ -188,10 +204,7 @@ class CreateModelStep(Step):
"""CreateModel step for workflow."""

def __init__(
self,
name: str,
model: Model,
inputs: CreateModelInput,
self, name: str, model: Model, inputs: CreateModelInput, depends_on: List[str] = None
):
"""Construct a CreateModelStep, given an `sagemaker.model.Model` instance.

Expand All @@ -203,8 +216,10 @@ def __init__(
model (Model): A `sagemaker.model.Model` instance.
inputs (CreateModelInput): A `sagemaker.inputs.CreateModelInput` instance.
Defaults to `None`.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.CreateModelStep`
depends on
"""
super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL)
super(CreateModelStep, self).__init__(name, StepTypeEnum.CREATE_MODEL, depends_on)
self.model = model
self.inputs = inputs or CreateModelInput()

Expand Down Expand Up @@ -247,6 +262,7 @@ def __init__(
transformer: Transformer,
inputs: TransformInput,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
):
"""Constructs a TransformStep, given an `Transformer` instance.

Expand All @@ -258,8 +274,10 @@ def __init__(
transformer (Transformer): A `sagemaker.transformer.Transformer` instance.
inputs (TransformInput): A `sagemaker.inputs.TransformInput` instance.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.TransformStep`
depends on
"""
super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM)
super(TransformStep, self).__init__(name, StepTypeEnum.TRANSFORM, depends_on)
self.transformer = transformer
self.inputs = inputs
self.cache_config = cache_config
Expand Down Expand Up @@ -320,6 +338,7 @@ def __init__(
code: str = None,
property_files: List[PropertyFile] = None,
cache_config: CacheConfig = None,
depends_on: List[str] = None,
):
"""Construct a ProcessingStep, given a `Processor` instance.

Expand All @@ -340,8 +359,10 @@ def __init__(
property_files (List[PropertyFile]): A list of property files that workflow looks
for and resolves from the configured processing output list.
cache_config (CacheConfig): A `sagemaker.workflow.steps.CacheConfig` instance.
depends_on (List[str]): A list of step names this `sagemaker.workflow.steps.ProcessingStep`
depends on
"""
super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING)
super(ProcessingStep, self).__init__(name, StepTypeEnum.PROCESSING, depends_on)
self.processor = processor
self.inputs = inputs
self.outputs = outputs
Expand Down
Loading