-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feature: add helper method to generate pipeline adjacency list #3128
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
We identified new issues on unchanged lines of code. Navigate to the Amazon CodeGuru Reviewer console to view the recommendations to fix them. |
root_prop = Properties(path=root_path) | ||
root_prop.__dict__["Outcome"] = Properties(f"{root_path}.Outcome") | ||
root_prop = Properties(step_name=name) | ||
root_prop.__dict__["Outcome"] = Properties(step_name=name, path="Outcome") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.
Modifying object.__dict__
directly or writing to an instance of a class __dict__
attribute directly is not recommended. Inside every module is a __dict__
object.dict attribute which contains its symbol table. If you modify object.__dict__
, then the symbol table is changed. Also, direct assignment to the __dict__
attribute is not possible.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Codecov Report
@@ Coverage Diff @@
## master #3128 +/- ##
==========================================
+ Coverage 89.62% 89.71% +0.08%
==========================================
Files 200 200
Lines 17391 17565 +174
==========================================
+ Hits 15587 15758 +171
- Misses 1804 1807 +3
Continue to review full report at Codecov.
|
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
@@ -62,7 +63,9 @@ def __init__( | |||
shape_name (str): The botocore service model shape name. | |||
shape_names (str): A List of the botocore service model shape name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
step_name
is missing in the doc string
def __init__(self, path: str, shape_name: str = None, service_name: str = "sagemaker"): | ||
def __init__( | ||
self, step_name: str, path: str, shape_name: str = None, service_name: str = "sagemaker" | ||
): | ||
"""Create a PropertiesList instance representing the given shape. | ||
|
||
Args: | ||
path (str): The parent path of the PropertiesList instance. | ||
shape_name (str): The botocore service model shape name. | ||
service_name (str): The botocore service name. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
step_name is missing in the doc string
src/sagemaker/workflow/properties.py
Outdated
self.shape_name = shape_name | ||
self.service_name = service_name | ||
self.path = path |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the self.path
is a relative path without the root: f"Steps.{step_name}."
while the self._path
(from super class) is an absolute path with the root right?
If so can we add some comments or update the attribute name to make it more clear?
src/sagemaker/workflow/steps.py
Outdated
if isinstance(depends_on_step, str): | ||
step_dependencies.add(depends_on_step) | ||
elif isinstance(depends_on_step, Step): | ||
step_dependencies.add(depends_on_step.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if a StepCollection is in the depends on list?
This merged commit just supports the StepCollection to be depended on.
Can you add some tests on this as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. The StepCollection needs to be treated with care.
for step in self.steps: | ||
dependencies[step.name] = step._find_step_dependencies() | ||
return dependencies | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This merged commit just supports StepCollection properties.
What if another step is referencing the StepCollection properties? Would the last sub step name be appended in the adjacency list?
Is there a test for this case?
src/sagemaker/workflow/steps.py
Outdated
def _find_step_dependencies(self): | ||
"""Find the step names this step is dependent on.""" | ||
|
||
def _find_dependencies_in_step_arguments(obj): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to move this out. You'll need it when run the pipeline
def _find_property_references_in_step_arguments() -> Set[Properties]:
...
src/sagemaker/workflow/steps.py
Outdated
if isinstance(depends_on_step, str): | ||
step_dependencies.add(depends_on_step) | ||
elif isinstance(depends_on_step, Step): | ||
step_dependencies.add(depends_on_step.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. The StepCollection needs to be treated with care.
src/sagemaker/workflow/pipeline.py
Outdated
def _generate_adjacency_list(self): | ||
"""Generate an adjacency list representing the step dependency DAG in this pipeline.""" | ||
adjacency_list = {} | ||
for step in self.steps: | ||
if isinstance(step, Step): | ||
adjacency_list[step.name] = step._find_step_dependencies() | ||
elif isinstance(step, StepCollection): | ||
adjacency_list.update(step._find_step_dependencies()) | ||
return adjacency_list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest we create a new class and offload the following task to this class
- the building of the graph,
- validation (no cycle), this is the major validation we have to add in the SDK.
- traversal. We can make it an iterable. The executor simply loop through each of the steps and run them.
class _PipelineGraph:
def __init__ (self, steps: List[Step]):
self.adjacency_list = {}
...
def __iter__():
...
def __next()__() -> List[Step]:
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Condition step needs to be treated in a special way.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
src/sagemaker/workflow/pipeline.py
Outdated
if self.is_duplicate_step_name(steps): | ||
raise ValueError("Pipeline steps cannot have duplicate names.") | ||
self.step_map = PipelineGraph._generate_step_map(steps) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two could be merged into one.
src/sagemaker/workflow/pipeline.py
Outdated
step_map[step.name] = step | ||
for inner_step in step.steps: | ||
step_map[inner_step.name] = inner_step |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A StepCollection is literally a list of steps. It is just an abstraction to the users. When building the graph, we just need to take care of the steps inside.
src/sagemaker/workflow/pipeline.py
Outdated
elif isinstance(step, StepCollection): | ||
step_collection_dependencies = step._find_step_dependencies(self.step_map) | ||
for k, v in step_collection_dependencies.items(): | ||
dependency_list[k].update(v) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can skip the parent StepCollection here.
for k, v in step_collection_dependencies.items(): | ||
dependency_list[k].update(v) | ||
|
||
if isinstance(step, ConditionStep): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if else branches in the condition step are already flattened out. So it just need to be treated as a single node.
The condition step itself has condition expressions, which may contain property references.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment can be ignored.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
src/sagemaker/workflow/pipeline.py
Outdated
step_map.update( | ||
PipelineGraph._generate_step_map(step.if_steps + step.else_steps) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The names of steps in if/else branches can't conflict with steps outside. So it is better to do something like
for step in steps:
if step.name in self.step_map:
raise ValueError
self.step_map[step.name] = step
if isinstance(step, ConditionStep):
self.generate_step_map(...)
elif isinstance(step, StepCollection):
self.generate_step_map(step.steps)
for k, v in step_collection_dependencies.items(): | ||
dependency_list[k].update(v) | ||
|
||
if isinstance(step, ConditionStep): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment can be ignored.
return "arn:role" | ||
|
||
|
||
def test_pipeline_duplicate_step_name(sagemaker_session_mock): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of more scenarios:
- duplicate_step_name_in_condition_step
- duplicate_step_name_in_step_collection
assert "Pipeline steps cannot have duplicate names." in str(error.value) | ||
|
||
|
||
def test_pipeline_graph_acyclic(sagemaker_session_mock): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more scenarios with condition steps
- test_pipeline_graph_acyclic_with_condition_step
- CustomStep -> ConditionStep/IfSteps/ElseStep: dependencies are defined via property reference
- CustomStep -> ConditionStep/IfStep/ElseStep: dependencies are defined via dependsOn attribute
- test_pipeline_graph_acyclic_with_step_collection
- CustomStep -> ModelStep -> CustomStep: dependencies are defined via property reference
- CustomStep -> ModelStep -> CustomStep: dependencies are defined via dependsOn attribute
src/sagemaker/workflow/entities.py
Outdated
|
||
@property | ||
@abc.abstractmethod | ||
def depends_on(self) -> List[str]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make this a protected method
_referenced_steps
"""Return the next Step node from the Topological sort order.""" | ||
|
||
while self.stack: | ||
return self.step_map.get(self.stack.pop()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.
You are using the get
method without a default argument to return the value of a key in a dictionary. We recommended that you use a default argument so that if the value for your key is not found, a default value is returned. If a default value is not provided and the key is not found, then None
is returned.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
adjacency_list = PipelineGraph.from_pipeline(pipeline).adjacency_list | ||
assert ordered(adjacency_list) == ordered( | ||
{"CondStep": ["IfStep", "ElseStep"], "IfStep": [], "ElseStep": []} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From visualization perspective, seems it's not able to tell which steps, in adjacency list of "CondStep", are in IF branch and which are in ELSE branch.
Please correct me if I'm wrong
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nic's design doc uses the following adjacency list format:
[
{"StepName": 'AbaloneProcess',
"OutBoundEdges": [
{
"nextStepName": "AbaloneTrain",
"edgeLabel": None <<<<<<<<<<< indicating the IF or ELSE branch
},
{
"nextStepName": "AbaloneEval",
"edgeLabel": None
}
]
},
{"StepName": 'AbaloneMSECond',
"OutBoundEdges": [
{
"nextStepId": "AbaloneRegisterModel",
"edgeLabel": "true"
},
{
"nextStepId": "AbaloneCreateModel",
"edgeLabel": "true"
},
{
"nextStepId": "AbaloneMSEFail",
"edgeLabel": "false"
}
]
},
Can we also apply it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This format can be derived using a combination of the current adjacency list format and the pipeline.steps
field.
Same thing applies to the condition step if-else branches. The goal of the adjacency list to give parent-child step relationship info. Nic can use the adjacency list + pipeline.steps to decipher whether the child of a condition step is a if-child or else-child.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
) Co-authored-by: Namrata Madan <[email protected]>
) Co-authored-by: Namrata Madan <[email protected]>
Issue #, if available:
Description of changes: Added a helper method to create adjacency list representing the step dependency DAG within a sagemaker pipeline
Testing done: unit tests
Merge Checklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your pull request.General
Tests
unique_name_from_base
to create resource names in integ tests (if appropriate)By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.