Skip to content

Commit f94c8e8

Browse files
feature: add spark processing support to processing jobs (aws#1894)
Co-authored-by: guoqiao <[email protected]>
1 parent 13c63bf commit f94c8e8

File tree

19 files changed

+4100
-29
lines changed

19 files changed

+4100
-29
lines changed

doc/amazon_sagemaker_processing.rst

Lines changed: 137 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -67,43 +67,155 @@ For an in-depth look, please see the `Scikit-learn Data Processing and Model Eva
6767
.. _Scikit-learn Data Processing and Model Evaluation: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/scikit_learn_data_processing_and_model_evaluation.ipynb
6868

6969

70-
Data Pre-Processing with Spark
71-
==============================
70+
Data Processing with Spark
71+
============================================
72+
SageMaker provides two classes for customers to run Spark applications: :class:`sagemaker.processing.PySparkProcessor` and :class:`sagemaker.processing.SparkJarProcessor`
7273

73-
You can use the :class:`sagemaker.processing.ScriptProcessor` class to run a script in a processing container, including your own container.
7474

75-
This example shows how you can run a processing job inside of a container that can run a Spark script called ``preprocess.py`` by invoking a command ``/opt/program/submit`` inside the container.
75+
PySparkProcessor
76+
---------------------
77+
78+
You can use the :class:`sagemaker.processing.PySparkProcessor` class to run PySpark scripts as processing jobs.
79+
80+
This example shows how you can take an existing PySpark script and run a processing job with the :class:`sagemaker.processing.PySparkProcessor` class and the pre-built SageMaker Spark container.
81+
82+
First you need to create a :class:`PySparkProcessor` object
83+
84+
.. code:: python
85+
86+
from sagemaker.processing import PySparkProcessor, ProcessingInput
87+
88+
spark_processor = PySparkProcessor(
89+
base_job_name="sm-spark",
90+
framework_version="2.4",
91+
py_version="py37",
92+
container_version="1",
93+
role="[Your SageMaker-compatible IAM role]",
94+
instance_count=2,
95+
instance_type="ml.c5.xlarge",
96+
max_runtime_in_seconds=1200,
97+
image_uri="your-image-uri"
98+
)
99+
100+
The ``framework_version`` is the spark version where the script will be running.
101+
``py_version`` and ``container_version`` are two new parameters you can specify in the constructor. They give you more flexibility to select the container version to avoid any backward incompatibilities and unnecessary dependency upgrade.
102+
103+
If you just specify the ``framework_version``, Sagemaker will default to a python version and the latest container version. To pin to an exact version of the SageMaker Spark container you need to specify all the three parameters: ``framework_version``, ``py_version`` and ``container_version``.
104+
105+
You can also specify the ``image_uri`` and it will override all the three parameters.
106+
107+
Note that ``command`` option will not be supported on either :class:`PySparkProcessor` or :class:`SparkJarProcessor`. If you want to run the script on your own container, please use :class:`ScriptProcessor` instead.
108+
109+
Then you can run your existing spark script ``preprocessing.py`` in a processing job.
76110

77111
.. code:: python
78112
79-
from sagemaker.processing import ScriptProcessor, ProcessingInput
113+
spark_processor.run(
114+
submit_app="preprocess.py",
115+
submit_py_files=["module.py", "lib.egg"],
116+
submit_jars=["lib.jar", "lib2.jar"],
117+
submit_files=["file.txt", "file2.csv"],
118+
arguments=["s3_input_bucket", bucket,
119+
"s3_input_key_prefix", input_prefix,
120+
"s3_output_bucket", bucket,
121+
"s3_output_key_prefix", input_preprocessed_prefix],
122+
spark_event_logs_s3_uri="s3://your-bucket/your-prefix/store-spark-events"
123+
)
124+
125+
``submit_app`` is the local relative path or s3 path of your python script, it's ``preprocess.py`` in this case.
126+
127+
You can also specify any python or jar dependencies or files that your script depends on with ``submit_py_files``, ``submit_jars`` and ``submit_files``.
128+
129+
``submit_py_files`` is a list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. ``submit_jars`` is a list of jars to include on the driver and executor classpaths. ``submit_files`` is list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName).
130+
131+
Each item in the list can be either s3 path or local path, and if you have dependencies stored both in s3 and locally, you can put all of them in ``submit_py_files``, ``submit_jars``, and ``submit_files``
80132

81-
spark_processor = ScriptProcessor(
82-
base_job_name="spark-preprocessor",
83-
image_uri="<ECR repository URI to your Spark processing image>",
84-
command=["/opt/program/submit"],
133+
Just like using the ScriptProcessor, you can pass any arguments to your script by specifying ``arguments`` parameter. In this example, four arguments are passed to the script to get and upload data from/to s3.
134+
135+
To support Spark history server, you can specify the parameter ``spark_event_logs_s3_uri`` when you invoke run() method to continuously upload spark events to s3. Note that the performance will be slightly impacted if you decide to publish spark event to s3.
136+
137+
Spark History Server
138+
---------------------
139+
140+
While script is running, or after script has run, you can view spark UI by running history server locally or in the notebook. By default, the s3 URI you provided in previous ``run()`` method will be used as spark event source, but you can also specify a different URI. Last but not the least, you can terminate the history server with ``terminate_history_server()``. Note that only one history server process will be running at a time.
141+
142+
Here's an example to start and terminate history server
143+
144+
.. code:: python
145+
146+
spark_processor.start_history_server()
147+
spark_processor.terminate_history_server()
148+
149+
You don't always have to run the script first to start history server, you can also specify the s3 URI with spark event logs stored. For example
150+
151+
.. code:: python
152+
153+
spark_processor.start_history_server(spark_event_logs_s3_uri="s3://your-bucket/your-prefix/store-spark-events")
154+
155+
To successfully run the history server, first you need to make sure ``docker`` is installed in your machine. Then you need to configure your aws credentials with S3 read permission. Last but not the least, you need to either invoke ``run()`` method with ``spark_event_logs_s3_uri`` first, or specify the ``spark_event_logs_s3_uri`` in ``start_history_server()`` method, otherwise it will fail.
156+
157+
SparkJarProcessor
158+
---------------------
159+
160+
Supposed that you have the jar file "preprocessing.jar" stored in the same directory as you are now, and the java package is ``com.path.to.your.class.PreProcessing.java``
161+
Here's an example of using PySparkProcessor.
162+
163+
.. code:: python
164+
165+
spark = SparkJarProcessor(
166+
base_job_name="sm-spark-java",
167+
image_uri=beta_image_uri,
85168
role=role,
86169
instance_count=2,
87-
instance_type="ml.r5.xlarge",
170+
instance_type="ml.c5.xlarge",
88171
max_runtime_in_seconds=1200,
89-
env={"mode": "python"},
90172
)
91173
174+
spark.run(
175+
submit_app="preprocessing.jar",
176+
submit_class="com.amazonaws.sagemaker.spark.test.HelloJavaSparkApp",
177+
arguments=["--input", input_s3_uri, "--output", output_s3_uri]
178+
)
179+
180+
:class:`SparkJarProcessor` is very similar to :class:`PySparkProcessor` except that the ``run()`` method takes only jar file path, configured by ``submit_app`` parameter, and ``submit_class`` parameter, which is equivalent to "--class" option for "spark-submit" command.
181+
182+
Configuration Override
183+
----------------------
184+
185+
Overriding Spark configuration is crucial for a number of tasks such as tuning your Spark application or configuring the hive metastore. Using our Python SDK, you can easily override Spark/Hive/Hadoop configuration.
186+
187+
An example usage would be overriding Spark executor memory/cores as demonstrated in the following code snippet:
188+
189+
.. code:: python
190+
191+
spark_processor = PySparkProcessor(
192+
base_job_name="sm-spark",
193+
image_uri=beta_image_uri,
194+
role=role,
195+
instance_count=2,
196+
instance_type="ml.c5.xlarge",
197+
max_runtime_in_seconds=1200,
198+
)
199+
200+
configuration = [{
201+
"Classification": "spark-defaults",
202+
"Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "1"},
203+
}]
204+
92205
spark_processor.run(
93-
code="preprocess.py",
94-
arguments=[
95-
"s3_input_bucket",
96-
bucket,
97-
"s3_input_key_prefix",
98-
input_prefix,
99-
"s3_output_bucket",
100-
bucket,
101-
"s3_output_key_prefix",
102-
input_preprocessed_prefix,
103-
],
104-
logs=False,
206+
submit_app="./code/preprocess.py",
207+
arguments=["s3_input_bucket", bucket,
208+
"s3_input_key_prefix", input_prefix_abalone,
209+
"s3_output_bucket", bucket,
210+
"s3_output_key_prefix", input_preprocessed_prefix_abalone],
211+
configuration=configuration,
212+
logs=False
105213
)
106214
215+
For an in-depth look of how to write your configuration, please see `Amazon EMR Configuring Applications`_ document.
216+
217+
.. _Amazon EMR Configuring Applications: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html
218+
107219
For an in-depth look, please see the `Feature Transformation with Spark`_ example notebook.
108220

109221
.. _Feature Transformation with Spark: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker_processing/feature_transformation_with_sagemaker_processing/feature_transformation_with_sagemaker_processing.ipynb
@@ -118,6 +230,8 @@ Processing class documentation
118230
- :class:`sagemaker.processing.Processor`
119231
- :class:`sagemaker.processing.ScriptProcessor`
120232
- :class:`sagemaker.sklearn.processing.SKLearnProcessor`
233+
- :class:`sagemaker.sklearn.processing.PySparkProcessor`
234+
- :class:`sagemaker.sklearn.processing.SparkJarProcessor`
121235
- :class:`sagemaker.processing.ProcessingInput`
122236
- :class:`sagemaker.processing.ProcessingOutput`
123237
- :class:`sagemaker.processing.ProcessingJob`
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"processing": {
3+
"processors": ["cpu"],
4+
"versions": {
5+
"2.4": {
6+
"py_versions": ["py37"],
7+
"registries": {
8+
"me-south-1": "750251592176",
9+
"ap-south-1": "105495057255",
10+
"eu-west-3": "136845547031",
11+
"us-east-2": "314815235551",
12+
"eu-west-1": "571004829621",
13+
"eu-central-1": "906073651304",
14+
"sa-east-1": "737130764395",
15+
"ap-east-1": "732049463269",
16+
"us-east-1": "173754725891",
17+
"ap-northeast-2": "860869212795",
18+
"eu-west-2": "836651553127",
19+
"ap-northeast-1": "411782140378",
20+
"us-west-2": "153931337802",
21+
"us-west-1": "667973535471",
22+
"ap-southeast-1": "759080221371",
23+
"ap-southeast-2": "440695851116",
24+
"ca-central-1": "446299261295",
25+
"cn-north-1": "671472414489",
26+
"cn-northwest-1": "844356804704",
27+
"eu-south-1": "753923664805",
28+
"af-south-1": "309385258863",
29+
"us-gov-west-1": "271483468897"
30+
},
31+
"repository": "sagemaker-spark-processing"
32+
}
33+
}
34+
}
35+
}

src/sagemaker/image_uris.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import re
2020

2121
from sagemaker import utils
22+
from sagemaker.spark import defaults
2223

2324
logger = logging.getLogger(__name__)
2425

@@ -33,6 +34,7 @@ def retrieve(
3334
instance_type=None,
3435
accelerator_type=None,
3536
image_scope=None,
37+
container_version=None,
3638
):
3739
"""Retrieves the ECR URI for the Docker image matching the given arguments.
3840
@@ -51,6 +53,7 @@ def retrieve(
5153
image_scope (str): The image type, i.e. what it is used for.
5254
Valid values: "training", "inference", "eia". If ``accelerator_type`` is set,
5355
``image_scope`` is ignored.
56+
container_version (str): the version of docker image
5457
5558
Returns:
5659
str: the ECR URI for the corresponding SageMaker Docker image.
@@ -63,7 +66,7 @@ def retrieve(
6366
version = _validate_version_and_set_if_needed(version, config, framework)
6467
version_config = config["versions"][_version_for_config(version, config)]
6568

66-
py_version = _validate_py_version_and_set_if_needed(py_version, version_config)
69+
py_version = _validate_py_version_and_set_if_needed(py_version, version_config, framework)
6770
version_config = version_config.get(py_version) or version_config
6871

6972
registry = _registry_from_region(region, version_config["registries"])
@@ -74,7 +77,9 @@ def retrieve(
7477
processor = _processor(
7578
instance_type, config.get("processors") or version_config.get("processors")
7679
)
77-
tag = _format_tag(version_config.get("tag_prefix", version), processor, py_version)
80+
tag = _format_tag(
81+
version_config.get("tag_prefix", version), processor, py_version, container_version
82+
)
7883

7984
if tag:
8085
repo += ":{}".format(tag)
@@ -103,7 +108,7 @@ def _config_for_framework_and_scope(framework, image_scope, accelerator_type=Non
103108
available_scopes[0],
104109
image_scope,
105110
)
106-
image_scope = available_scopes[0]
111+
image_scope = list(available_scopes)[0]
107112

108113
if not image_scope and "scope" in config and set(available_scopes) == {"training", "inference"}:
109114
logger.info(
@@ -212,7 +217,7 @@ def _processor(instance_type, available_processors):
212217
return processor
213218

214219

215-
def _validate_py_version_and_set_if_needed(py_version, version_config):
220+
def _validate_py_version_and_set_if_needed(py_version, version_config, framework):
216221
"""Checks if the Python version is one of the supported versions."""
217222
if "repository" in version_config:
218223
available_versions = version_config.get("py_versions")
@@ -224,6 +229,9 @@ def _validate_py_version_and_set_if_needed(py_version, version_config):
224229
logger.info("Ignoring unnecessary Python version: %s.", py_version)
225230
return None
226231

232+
if py_version is None and defaults.SPARK_NAME == framework:
233+
return None
234+
227235
if py_version is None and len(available_versions) == 1:
228236
logger.info("Defaulting to only available Python version: %s", available_versions[0])
229237
return available_versions[0]
@@ -242,6 +250,6 @@ def _validate_arg(arg, available_options, arg_name):
242250
)
243251

244252

245-
def _format_tag(tag_prefix, processor, py_version):
253+
def _format_tag(tag_prefix, processor, py_version, container_version):
246254
"""Creates a tag for the image URI."""
247-
return "-".join([x for x in (tag_prefix, processor, py_version) if x])
255+
return "-".join([x for x in (tag_prefix, processor, py_version, container_version) if x])

src/sagemaker/processing.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,10 @@ def run(
166166
if wait:
167167
self.latest_job.wait(logs=logs)
168168

169+
def _extend_processing_args(self, inputs, outputs, **kwargs): # pylint: disable=W0613
170+
"""Extend inputs and outputs based on extra parameters"""
171+
return inputs, outputs
172+
169173
def _normalize_args(self, job_name=None, arguments=None, inputs=None, outputs=None, code=None):
170174
"""Normalizes the arguments so that they can be passed to the job run
171175

src/sagemaker/spark/__init__.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Placeholder docstring"""
14+
from __future__ import absolute_import
15+
16+
from sagemaker.spark.processing import PySparkProcessor, SparkJarProcessor # noqa: F401

src/sagemaker/spark/defaults.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
"""Default constants used by Spark processing."""
14+
from __future__ import absolute_import
15+
16+
SPARK_NAME = "spark"

0 commit comments

Comments
 (0)