Skip to content

Commit 25ac296

Browse files
giuseppeporcelliajaykarpuricywang86ruimetrizable
authored
feat: Support local mode for Amazon SageMaker Processing jobs (#1961)
Co-authored-by: Ajay Karpur <[email protected]> Co-authored-by: icywang86rui <[email protected]> Co-authored-by: Eric Johnson <[email protected]>
1 parent dd155d0 commit 25ac296

File tree

8 files changed

+879
-6
lines changed

8 files changed

+879
-6
lines changed

src/sagemaker/local/entities.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,142 @@
3838
HEALTH_CHECK_TIMEOUT_LIMIT = 120
3939

4040

41+
class _LocalProcessingJob:
42+
"""Defines and starts a local processing job."""
43+
44+
_STARTING = "Starting"
45+
_PROCESSING = "Processing"
46+
_COMPLETED = "Completed"
47+
48+
def __init__(self, container):
49+
"""Creates a local processing job.
50+
51+
Args:
52+
container: the local container object.
53+
"""
54+
self.container = container
55+
self.state = "Created"
56+
self.start_time = None
57+
self.end_time = None
58+
self.processing_job_name = ""
59+
self.processing_inputs = None
60+
self.processing_output_config = None
61+
self.environment = None
62+
63+
def start(self, processing_inputs, processing_output_config, environment, processing_job_name):
64+
"""Starts a local processing job.
65+
66+
Args:
67+
processing_inputs: The processing input configuration.
68+
processing_output_config: The processing input configuration.
69+
environment: The collection of environment variables passed to the job.
70+
processing_job_name: The processing job name.
71+
"""
72+
self.state = self._STARTING
73+
74+
for item in processing_inputs:
75+
if "DatasetDefinition" in item:
76+
raise RuntimeError("DatasetDefinition is not currently supported in Local Mode")
77+
78+
try:
79+
s3_input = item["S3Input"]
80+
except KeyError:
81+
raise ValueError("Processing input must have a valid ['S3Input']")
82+
83+
item["DataUri"] = s3_input["S3Uri"]
84+
85+
if "S3InputMode" in s3_input and s3_input["S3InputMode"] != "File":
86+
raise RuntimeError(
87+
"S3InputMode: %s is not currently supported in Local Mode"
88+
% s3_input["S3InputMode"]
89+
)
90+
91+
if (
92+
"S3DataDistributionType" in s3_input
93+
and s3_input["S3DataDistributionType"] != "FullyReplicated"
94+
):
95+
raise RuntimeError(
96+
"DataDistribution: %s is not currently supported in Local Mode"
97+
% s3_input["S3DataDistributionType"]
98+
)
99+
100+
if "S3CompressionType" in s3_input and s3_input["S3CompressionType"] != "None":
101+
raise RuntimeError(
102+
"CompressionType: %s is not currently supported in Local Mode"
103+
% s3_input["S3CompressionType"]
104+
)
105+
106+
if processing_output_config and "Outputs" in processing_output_config:
107+
processing_outputs = processing_output_config["Outputs"]
108+
109+
for item in processing_outputs:
110+
if "FeatureStoreOutput" in item:
111+
raise RuntimeError(
112+
"FeatureStoreOutput is not currently supported in Local Mode"
113+
)
114+
115+
try:
116+
s3_output = item["S3Output"]
117+
except KeyError:
118+
raise ValueError("Processing output must have a valid ['S3Output']")
119+
120+
if s3_output["S3UploadMode"] != "EndOfJob":
121+
raise RuntimeError(
122+
"UploadMode: %s is not currently supported in Local Mode."
123+
% s3_output["S3UploadMode"]
124+
)
125+
126+
self.start_time = datetime.datetime.now()
127+
self.state = self._PROCESSING
128+
129+
self.processing_job_name = processing_job_name
130+
self.processing_inputs = processing_inputs
131+
self.processing_output_config = processing_output_config
132+
self.environment = environment
133+
134+
self.container.process(
135+
processing_inputs, processing_output_config, environment, processing_job_name
136+
)
137+
138+
self.end_time = datetime.datetime.now()
139+
self.state = self._COMPLETED
140+
141+
def describe(self):
142+
"""Describes a local processing job.
143+
144+
Returns:
145+
An object describing the processing job.
146+
"""
147+
148+
response = {
149+
"ProcessingJobArn": self.processing_job_name,
150+
"ProcessingJobName": self.processing_job_name,
151+
"AppSpecification": {
152+
"ImageUri": self.container.image,
153+
"ContainerEntrypoint": self.container.container_entrypoint,
154+
"ContainerArguments": self.container.container_arguments,
155+
},
156+
"Environment": self.environment,
157+
"ProcessingInputs": self.processing_inputs,
158+
"ProcessingOutputConfig": self.processing_output_config,
159+
"ProcessingResources": {
160+
"ClusterConfig": {
161+
"InstanceCount": self.container.instance_count,
162+
"InstanceType": self.container.instance_type,
163+
"VolumeSizeInGB": 30,
164+
"VolumeKmsKeyId": None,
165+
}
166+
},
167+
"RoleArn": "<no_role>",
168+
"StoppingCondition": {"MaxRuntimeInSeconds": 86400},
169+
"ProcessingJobStatus": self.state,
170+
"ProcessingStartTime": self.start_time,
171+
"ProcessingEndTime": self.end_time,
172+
}
173+
174+
return response
175+
176+
41177
class _LocalTrainingJob(object):
42178
"""Placeholder docstring"""
43179

0 commit comments

Comments
 (0)