Skip to content

feature: add spark processing support to processing jobs #1894

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 137 additions & 23 deletions doc/amazon_sagemaker_processing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,43 +67,155 @@ For an in-depth look, please see the `Scikit-learn Data Processing and Model Eva
.. _Scikit-learn Data Processing and Model Evaluation: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker_processing/scikit_learn_data_processing_and_model_evaluation/scikit_learn_data_processing_and_model_evaluation.ipynb


Data Pre-Processing with Spark
==============================
Data Processing with Spark
============================================
SageMaker provides two classes for customers to run Spark applications: :class:`sagemaker.processing.PySparkProcessor` and :class:`sagemaker.processing.SparkJarProcessor`

You can use the :class:`sagemaker.processing.ScriptProcessor` class to run a script in a processing container, including your own container.

This example shows how you can run a processing job inside of a container that can run a Spark script called ``preprocess.py`` by invoking a command ``/opt/program/submit`` inside the container.
PySparkProcessor
---------------------

You can use the :class:`sagemaker.processing.PySparkProcessor` class to run PySpark scripts as processing jobs.

This example shows how you can take an existing PySpark script and run a processing job with the :class:`sagemaker.processing.PySparkProcessor` class and the pre-built SageMaker Spark container.

First you need to create a :class:`PySparkProcessor` object

.. code:: python

from sagemaker.processing import PySparkProcessor, ProcessingInput

spark_processor = PySparkProcessor(
base_job_name="sm-spark",
framework_version="2.4",
py_version="py37",
container_version="1",
role="[Your SageMaker-compatible IAM role]",
instance_count=2,
instance_type="ml.c5.xlarge",
max_runtime_in_seconds=1200,
image_uri="your-image-uri"
)

The ``framework_version`` is the spark version where the script will be running.
``py_version`` and ``container_version`` are two new parameters you can specify in the constructor. They give you more flexibility to select the container version to avoid any backward incompatibilities and unnecessary dependency upgrade.

If you just specify the ``framework_version``, Sagemaker will default to a python version and the latest container version. To pin to an exact version of the SageMaker Spark container you need to specify all the three parameters: ``framework_version``, ``py_version`` and ``container_version``.

You can also specify the ``image_uri`` and it will override all the three parameters.

Note that ``command`` option will not be supported on either :class:`PySparkProcessor` or :class:`SparkJarProcessor`. If you want to run the script on your own container, please use :class:`ScriptProcessor` instead.

Then you can run your existing spark script ``preprocessing.py`` in a processing job.

.. code:: python

from sagemaker.processing import ScriptProcessor, ProcessingInput
spark_processor.run(
submit_app="preprocess.py",
submit_py_files=["module.py", "lib.egg"],
submit_jars=["lib.jar", "lib2.jar"],
submit_files=["file.txt", "file2.csv"],
arguments=["s3_input_bucket", bucket,
"s3_input_key_prefix", input_prefix,
"s3_output_bucket", bucket,
"s3_output_key_prefix", input_preprocessed_prefix],
spark_event_logs_s3_uri="s3://your-bucket/your-prefix/store-spark-events"
)

``submit_app`` is the local relative path or s3 path of your python script, it's ``preprocess.py`` in this case.

You can also specify any python or jar dependencies or files that your script depends on with ``submit_py_files``, ``submit_jars`` and ``submit_files``.

``submit_py_files`` is a list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps. ``submit_jars`` is a list of jars to include on the driver and executor classpaths. ``submit_files`` is list of files to be placed in the working directory of each executor. File paths of these files in executors can be accessed via SparkFiles.get(fileName).

Each item in the list can be either s3 path or local path, and if you have dependencies stored both in s3 and locally, you can put all of them in ``submit_py_files``, ``submit_jars``, and ``submit_files``

spark_processor = ScriptProcessor(
base_job_name="spark-preprocessor",
image_uri="<ECR repository URI to your Spark processing image>",
command=["/opt/program/submit"],
Just like using the ScriptProcessor, you can pass any arguments to your script by specifying ``arguments`` parameter. In this example, four arguments are passed to the script to get and upload data from/to s3.

To support Spark history server, you can specify the parameter ``spark_event_logs_s3_uri`` when you invoke run() method to continuously upload spark events to s3. Note that the performance will be slightly impacted if you decide to publish spark event to s3.

Spark History Server
---------------------

While script is running, or after script has run, you can view spark UI by running history server locally or in the notebook. By default, the s3 URI you provided in previous ``run()`` method will be used as spark event source, but you can also specify a different URI. Last but not the least, you can terminate the history server with ``terminate_history_server()``. Note that only one history server process will be running at a time.

Here's an example to start and terminate history server

.. code:: python

spark_processor.start_history_server()
spark_processor.terminate_history_server()

You don't always have to run the script first to start history server, you can also specify the s3 URI with spark event logs stored. For example

.. code:: python

spark_processor.start_history_server(spark_event_logs_s3_uri="s3://your-bucket/your-prefix/store-spark-events")

To successfully run the history server, first you need to make sure ``docker`` is installed in your machine. Then you need to configure your aws credentials with S3 read permission. Last but not the least, you need to either invoke ``run()`` method with ``spark_event_logs_s3_uri`` first, or specify the ``spark_event_logs_s3_uri`` in ``start_history_server()`` method, otherwise it will fail.

SparkJarProcessor
---------------------

Supposed that you have the jar file "preprocessing.jar" stored in the same directory as you are now, and the java package is ``com.path.to.your.class.PreProcessing.java``
Here's an example of using PySparkProcessor.

.. code:: python

spark = SparkJarProcessor(
base_job_name="sm-spark-java",
image_uri=beta_image_uri,
role=role,
instance_count=2,
instance_type="ml.r5.xlarge",
instance_type="ml.c5.xlarge",
max_runtime_in_seconds=1200,
env={"mode": "python"},
)

spark.run(
submit_app="preprocessing.jar",
submit_class="com.amazonaws.sagemaker.spark.test.HelloJavaSparkApp",
arguments=["--input", input_s3_uri, "--output", output_s3_uri]
)

:class:`SparkJarProcessor` is very similar to :class:`PySparkProcessor` except that the ``run()`` method takes only jar file path, configured by ``submit_app`` parameter, and ``submit_class`` parameter, which is equivalent to "--class" option for "spark-submit" command.

Configuration Override
----------------------

Overriding Spark configuration is crucial for a number of tasks such as tuning your Spark application or configuring the hive metastore. Using our Python SDK, you can easily override Spark/Hive/Hadoop configuration.

An example usage would be overriding Spark executor memory/cores as demonstrated in the following code snippet:

.. code:: python

spark_processor = PySparkProcessor(
base_job_name="sm-spark",
image_uri=beta_image_uri,
role=role,
instance_count=2,
instance_type="ml.c5.xlarge",
max_runtime_in_seconds=1200,
)

configuration = [{
"Classification": "spark-defaults",
"Properties": {"spark.executor.memory": "2g", "spark.executor.cores": "1"},
}]

spark_processor.run(
code="preprocess.py",
arguments=[
"s3_input_bucket",
bucket,
"s3_input_key_prefix",
input_prefix,
"s3_output_bucket",
bucket,
"s3_output_key_prefix",
input_preprocessed_prefix,
],
logs=False,
submit_app="./code/preprocess.py",
arguments=["s3_input_bucket", bucket,
"s3_input_key_prefix", input_prefix_abalone,
"s3_output_bucket", bucket,
"s3_output_key_prefix", input_preprocessed_prefix_abalone],
configuration=configuration,
logs=False
)

For an in-depth look of how to write your configuration, please see `Amazon EMR Configuring Applications`_ document.

.. _Amazon EMR Configuring Applications: https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps.html

For an in-depth look, please see the `Feature Transformation with Spark`_ example notebook.

.. _Feature Transformation with Spark: https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker_processing/feature_transformation_with_sagemaker_processing/feature_transformation_with_sagemaker_processing.ipynb
Expand All @@ -118,6 +230,8 @@ Processing class documentation
- :class:`sagemaker.processing.Processor`
- :class:`sagemaker.processing.ScriptProcessor`
- :class:`sagemaker.sklearn.processing.SKLearnProcessor`
- :class:`sagemaker.sklearn.processing.PySparkProcessor`
- :class:`sagemaker.sklearn.processing.SparkJarProcessor`
- :class:`sagemaker.processing.ProcessingInput`
- :class:`sagemaker.processing.ProcessingOutput`
- :class:`sagemaker.processing.ProcessingJob`
Expand Down
35 changes: 35 additions & 0 deletions src/sagemaker/image_uri_config/spark.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"processing": {
"processors": ["cpu"],
"versions": {
"2.4": {
"py_versions": ["py37"],
"registries": {
"me-south-1": "750251592176",
"ap-south-1": "105495057255",
"eu-west-3": "136845547031",
"us-east-2": "314815235551",
"eu-west-1": "571004829621",
"eu-central-1": "906073651304",
"sa-east-1": "737130764395",
"ap-east-1": "732049463269",
"us-east-1": "173754725891",
"ap-northeast-2": "860869212795",
"eu-west-2": "836651553127",
"ap-northeast-1": "411782140378",
"us-west-2": "153931337802",
"us-west-1": "667973535471",
"ap-southeast-1": "759080221371",
"ap-southeast-2": "440695851116",
"ca-central-1": "446299261295",
"cn-north-1": "671472414489",
"cn-northwest-1": "844356804704",
"eu-south-1": "753923664805",
"af-south-1": "309385258863",
"us-gov-west-1": "271483468897"
},
"repository": "sagemaker-spark-processing"
}
}
}
}
20 changes: 14 additions & 6 deletions src/sagemaker/image_uris.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import re

from sagemaker import utils
from sagemaker.spark import defaults

logger = logging.getLogger(__name__)

Expand All @@ -33,6 +34,7 @@ def retrieve(
instance_type=None,
accelerator_type=None,
image_scope=None,
container_version=None,
):
"""Retrieves the ECR URI for the Docker image matching the given arguments.

Expand All @@ -51,6 +53,7 @@ def retrieve(
image_scope (str): The image type, i.e. what it is used for.
Valid values: "training", "inference", "eia". If ``accelerator_type`` is set,
``image_scope`` is ignored.
container_version (str): the version of docker image

Returns:
str: the ECR URI for the corresponding SageMaker Docker image.
Expand All @@ -63,7 +66,7 @@ def retrieve(
version = _validate_version_and_set_if_needed(version, config, framework)
version_config = config["versions"][_version_for_config(version, config)]

py_version = _validate_py_version_and_set_if_needed(py_version, version_config)
py_version = _validate_py_version_and_set_if_needed(py_version, version_config, framework)
version_config = version_config.get(py_version) or version_config

registry = _registry_from_region(region, version_config["registries"])
Expand All @@ -74,7 +77,9 @@ def retrieve(
processor = _processor(
instance_type, config.get("processors") or version_config.get("processors")
)
tag = _format_tag(version_config.get("tag_prefix", version), processor, py_version)
tag = _format_tag(
version_config.get("tag_prefix", version), processor, py_version, container_version
)

if tag:
repo += ":{}".format(tag)
Expand Down Expand Up @@ -103,7 +108,7 @@ def _config_for_framework_and_scope(framework, image_scope, accelerator_type=Non
available_scopes[0],
image_scope,
)
image_scope = available_scopes[0]
image_scope = list(available_scopes)[0]

if not image_scope and "scope" in config and set(available_scopes) == {"training", "inference"}:
logger.info(
Expand Down Expand Up @@ -212,7 +217,7 @@ def _processor(instance_type, available_processors):
return processor


def _validate_py_version_and_set_if_needed(py_version, version_config):
def _validate_py_version_and_set_if_needed(py_version, version_config, framework):
"""Checks if the Python version is one of the supported versions."""
if "repository" in version_config:
available_versions = version_config.get("py_versions")
Expand All @@ -224,6 +229,9 @@ def _validate_py_version_and_set_if_needed(py_version, version_config):
logger.info("Ignoring unnecessary Python version: %s.", py_version)
return None

if py_version is None and defaults.SPARK_NAME == framework:
return None

if py_version is None and len(available_versions) == 1:
logger.info("Defaulting to only available Python version: %s", available_versions[0])
return available_versions[0]
Expand All @@ -242,6 +250,6 @@ def _validate_arg(arg, available_options, arg_name):
)


def _format_tag(tag_prefix, processor, py_version):
def _format_tag(tag_prefix, processor, py_version, container_version):
"""Creates a tag for the image URI."""
return "-".join([x for x in (tag_prefix, processor, py_version) if x])
return "-".join([x for x in (tag_prefix, processor, py_version, container_version) if x])
4 changes: 4 additions & 0 deletions src/sagemaker/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ def run(
if wait:
self.latest_job.wait(logs=logs)

def _extend_processing_args(self, inputs, outputs, **kwargs): # pylint: disable=W0613
"""Extend inputs and outputs based on extra parameters"""
return inputs, outputs

def _normalize_args(self, job_name=None, arguments=None, inputs=None, outputs=None, code=None):
"""Normalizes the arguments so that they can be passed to the job run

Expand Down
16 changes: 16 additions & 0 deletions src/sagemaker/spark/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Placeholder docstring"""
from __future__ import absolute_import

from sagemaker.spark.processing import PySparkProcessor, SparkJarProcessor # noqa: F401
16 changes: 16 additions & 0 deletions src/sagemaker/spark/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Default constants used by Spark processing."""
from __future__ import absolute_import

SPARK_NAME = "spark"
Loading