Skip to content

feat: Support local mode for Amazon SageMaker Processing jobs #1961

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
1ac3a7a
Initial work on SM Processing local mode support.
Aug 31, 2020
3a13212
Work in progress on local mode for SM Processing.
Sep 8, 2020
8682d56
Minor changes.
Sep 9, 2020
f43c033
Implemented upload to S3 of local processing job results. Written pro…
Sep 21, 2020
836ed94
Fixed instance count.
Sep 21, 2020
8680212
Fixed KMS value None.
Sep 21, 2020
9d86076
Merge remote-tracking branch 'upstream/master'
Sep 21, 2020
c139e2a
Changed entrypoint format and removed some tracing prints.
Sep 23, 2020
6de23d5
Improved describe processing job in local mode. Added support for loc…
Sep 24, 2020
3f30b0d
Enforced custom container name in docker host configuration.
Sep 24, 2020
ec07127
Merge remote-tracking branch 'upstream/master'
Oct 7, 2020
776887d
Fixed pylint errors.
Oct 7, 2020
1f986ce
Merging upstream/master.
Oct 19, 2020
49e7699
Added initial unit tests and related fixes.
Oct 19, 2020
e2b57bd
Fixed compression type default to 'None'.
Oct 19, 2020
5bde004
Merge branch 'master' into master
ajaykarpur Oct 20, 2020
8737bf0
Merge branch 'master' into master
icywang86rui Oct 26, 2020
9fe4ff7
Merge branch 'master' into master
metrizable Nov 3, 2020
f17c0fc
Merge remote-tracking branch 'upstream/master'
Dec 17, 2020
520c101
Merge branch 'master' of https://github.com/giuseppeporcelli/sagemake…
Dec 17, 2020
a846b32
Added doc strings to entities.py, implemented integ tests and fixed t…
Dec 17, 2020
14340ae
Improved docstrings, handled DatasetDefinition and FeatureStoreOutput…
Dec 18, 2020
2bb37c5
Improved abstraction on Processor objects when handling local session…
Dec 18, 2020
5110995
Minor indent fix.
Dec 18, 2020
6bcffa7
Reformatted.
Dec 18, 2020
69710d9
Merge branch 'master' into master
ajaykarpur Dec 21, 2020
44656da
Merge branch 'master' into master
metrizable Dec 22, 2020
8614324
Merge branch 'master' into master
metrizable Dec 22, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions src/sagemaker/local/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,135 @@
HEALTH_CHECK_TIMEOUT_LIMIT = 120


class _LocalProcessingJob(object):
"""Placeholder docstring"""

_STARTING = "Starting"
_PROCESSING = "Processing"
_COMPLETED = "Completed"
_states = ["Starting", "Processing", "Completed"]

def __init__(self, container):
"""
Args:
container:
"""
self.container = container
self.state = "Created"
self.start_time = None
self.end_time = None
self.processing_job_name = ""
self.processing_inputs = None
self.processing_output_config = None
self.environment = None

def start(self, processing_inputs, processing_output_config, environment, processing_job_name):
"""
Args:
processing_inputs:
processing_output_config:
environment:
processing_job_name:
"""

for item in processing_inputs:
if item["S3Input"]:
data_uri = item["S3Input"]["S3Uri"]
else:
raise ValueError("Processing input must have a valid ['S3Input']")

if item["S3Input"]["S3InputMode"]:
input_mode = item["S3Input"]["S3InputMode"]
else:
raise ValueError("Processing input must have a valid ['S3InputMode']")

item["DataUri"] = data_uri

if (
"S3DataDistributionType" in item["S3Input"]
and item["S3Input"]["S3DataDistributionType"] != "FullyReplicated"
):

raise RuntimeError(
"DataDistribution: %s is not currently supported in Local Mode"
% item["S3Input"]["S3DataDistributionType"]
)

if input_mode != "File":
raise RuntimeError(
"S3InputMode: %s is not currently supported in Local Mode" % input_mode
)

if (
"S3CompressionType" in item["S3Input"]
and item["S3Input"]["S3CompressionType"] != "None"
):

raise RuntimeError(
"CompressionType: %s is not currently supported in Local Mode"
% item["S3Input"]["S3CompressionType"]
)

if processing_output_config and "Outputs" in processing_output_config:
processing_outputs = processing_output_config["Outputs"]

for item in processing_outputs:
if item["S3Output"]:
upload_mode = item["S3Output"]["S3UploadMode"]
else:
raise ValueError("Processing output must have a valid ['S3Output']")

if upload_mode != "EndOfJob":
raise RuntimeError(
"UploadMode: %s is not currently supported in Local Mode." % upload_mode
)

self.start_time = datetime.datetime.now()
self.state = self._PROCESSING

self.processing_job_name = processing_job_name
self.processing_inputs = processing_inputs
self.processing_output_config = processing_output_config
self.environment = environment

self.container.process(
processing_inputs, processing_output_config, environment, processing_job_name
)
self.end_time = datetime.datetime.now()
self.state = self._COMPLETED

def describe(self):
"""Placeholder docstring"""

response = {
"ProcessingJobArn": self.processing_job_name,
"ProcessingJobName": self.processing_job_name,
"AppSpecification": {
"ImageUri": self.container.image,
"ContainerEntrypoint": self.container.container_entrypoint,
"ContainerArguments": self.container.container_arguments,
},
"Environment": self.environment,
"ProcessingInputs": self.processing_inputs,
"ProcessingOutputConfig": self.processing_output_config,
"ProcessingResources": {
"ClusterConfig": {
"InstanceCount": self.container.instance_count,
"InstanceType": self.container.instance_type,
"VolumeSizeInGB": 30,
"VolumeKmsKeyId": None,
}
},
"RoleArn": "<no_role>",
"StoppingCondition": {"MaxRuntimeInSeconds": 86400},
"ProcessingJobStatus": self.state,
"ProcessingStartTime": self.start_time,
"ProcessingEndTime": self.end_time,
}

return response


class _LocalTrainingJob(object):
"""Placeholder docstring"""

Expand Down
Loading