-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feature: Apache Airflow integration for SageMaker Processing Jobs #1620
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice work! thanks for the contribution. mostly style comments.
src/sagemaker/processing.py
Outdated
config = {} | ||
if container_arguments is not None: | ||
config["ContainerArguments"] = container_arguments | ||
if container_entrypoint is not None: | ||
config["ContainerEntrypoint"] = container_entrypoint | ||
config["ImageUri"] = image_uri |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's more pythonic if, since "ImageUri"
is always a key of config
, just initialize with it:
config = {"ImageUri": image_uri}
if container_arguments ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision.
src/sagemaker/processing.py
Outdated
config = {} | ||
if kms_key_id is not None: | ||
config["KmsKeyId"] = kms_key_id | ||
config["Outputs"] = outputs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since "Outputs"
is always present as a key, initialize config
with it:
config = {"Outputs": outputs}
if kms_key_id ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision.
src/sagemaker/processing.py
Outdated
stopping_condition = {} | ||
stopping_condition["MaxRuntimeInSeconds"] = max_runtime_in_seconds | ||
return stopping_condition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just:
return {"MaxRuntimeInSeconds": max_runtime_in_seconds}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision.
src/sagemaker/processing.py
Outdated
@@ -739,6 +739,66 @@ def stop(self): | |||
"""Stops the processing job.""" | |||
self.sagemaker_session.stop_processing_job(self.name) | |||
|
|||
@staticmethod | |||
def _prepare_app_specification(container_arguments, container_entrypoint, image_uri): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably don't want the underscore to precede the name? although there is no practical limitation to accessibility, by convention of the preceding underscore, it typically indicates a "protected" method.
there are also some material ramifications: from the public names section here:
If
__all__
is not defined, the set of public names includes all names found in the module’s namespace which do not begin with an underscore character ('_')
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the underscore in the next revision.
src/sagemaker/processing.py
Outdated
|
||
@staticmethod | ||
def _prepare_output_config(kms_key_id, outputs): | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit-pep257: include summary line
(i know the preceding underscore indicates the method is not "public", but is used outside of this class/subclasses, and probably shouldn't have the underscore)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the underscore and added summary lines, descriptions of args and type for all of these functions.
src/sagemaker/processing.py
Outdated
def _prepare_output_config(kms_key_id, outputs): | ||
""" | ||
Args: | ||
kms_key_id: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use google docstring conventions and include brief descriptions of args. we typically also indicate type in parentheses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed the underscore and added summary lines, descriptions of args and type for all of these functions.
src/sagemaker/workflow/airflow.py
Outdated
else utils.base_name_from_image(processor.image_uri) | ||
) | ||
|
||
input_dict = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe call it input_dicts
, since it's actually a list of dicts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision.
src/sagemaker/workflow/airflow.py
Outdated
for i in inputs: | ||
input_dict.append(i._to_request_dict()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more pythonic (and performs better) to use list comprehension:
input_dicts = [i._to_request_dict() for i in inputs]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision.
src/sagemaker/workflow/airflow.py
Outdated
output_dict = [] | ||
for output in outputs: | ||
output_dict.append(output._to_request_dict()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same:
output_dicts = [output._to_request_dict() for output in outputs]
and it's very similar to previous transformation on inputs. helper function to encapsulate same logic and invoke function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a helper function in the next revision.
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
aff3deb
to
95bd791
Compare
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
src/sagemaker/processing.py
Outdated
processing_resources = {} | ||
cluster_config = {} | ||
if volume_kms_key_id is not None: | ||
cluster_config["VolumeKmsKeyId"] = volume_kms_key_id | ||
cluster_config["InstanceCount"] = instance_count | ||
cluster_config["InstanceType"] = instance_type | ||
cluster_config["VolumeSizeInGB"] = volume_size_in_gb | ||
processing_resources["ClusterConfig"] = cluster_config | ||
return processing_resources |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit-style: initialize dict with values?
cluster_config = {
"InstanceCount": instance_count,
...
}
...
return {"ClusterConfig": cluster_config}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision.
src/sagemaker/processing.py
Outdated
|
||
@staticmethod | ||
def prepare_output_config(kms_key_id, outputs): | ||
"""Prepares a dict that represents a ProcessingOutputConfig, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit-pep257: single line summary, additional information after blank line in body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision. (moved the extra information to describe what the function returns.
src/sagemaker/processing.py
Outdated
@@ -739,6 +739,94 @@ def stop(self): | |||
"""Stops the processing job.""" | |||
self.sagemaker_session.stop_processing_job(self.name) | |||
|
|||
@staticmethod | |||
def prepare_app_specification(container_arguments, container_entrypoint, image_uri): | |||
"""Prepares a dict that represents a ProcessingJob's AppSpecification |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit-pep257
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in the next revision. (moved the extra information to describe what the function returns.)
95bd791
to
35ced0a
Compare
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
35ced0a
to
190f09f
Compare
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ship it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ship it
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
AWS CodeBuild CI Report
Powered by github-codebuild-logs, available on the AWS Serverless Application Repository |
Issue #, if available:
Description of changes:
This change enables an Apache Airflow operator for SageMaker Processing Jobs through a processing_config that can be passed to an Airflow operator.
Testing done:
Added a unit test for this config. Also tested this with local changes to the Airflow repository that add the Operator and was able to create/run a DAG for ProcessingJobs similar to the examples here.
Merge Checklist
Put an
x
in the boxes that apply. You can also fill these out after creating the PR. If you're unsure about any of them, don't hesitate to ask. We're here to help! This is simply a reminder of what we are going to look for before merging your pull request.General
Tests
unique_name_from_base
to create resource names in integ tests (if appropriate)By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.