Skip to content

breaking: rename distributions to distribution in TF/MXNet estimators #1662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion doc/frameworks/mxnet/using_mxnet.rst
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ If you want to use parameter servers for distributed training, set the following

.. code:: python

distributions={'parameter_server': {'enabled': True}}
distribution={'parameter_server': {'enabled': True}}

Then, when writing a distributed training script, use an MXNet kvstore to store and share model parameters.
During training, Amazon SageMaker automatically starts an MXNet kvstore server and scheduler processes on hosts in your training job cluster.
Expand Down
10 changes: 5 additions & 5 deletions doc/frameworks/tensorflow/using_tf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ Distributed Training

To run your training job with multiple instances in a distributed fashion, set ``train_instance_count``
to a number larger than 1. We support two different types of distributed training, parameter server and Horovod.
The ``distributions`` parameter is used to configure which distributed training strategy to use.
The ``distribution`` parameter is used to configure which distributed training strategy to use.

Training with parameter servers
-------------------------------

If you specify parameter_server as the value of the distributions parameter, the container launches a parameter server
If you specify parameter_server as the value of the distribution parameter, the container launches a parameter server
thread on each instance in the training cluster, and then executes your training code. You can find more information on
TensorFlow distributed training at `TensorFlow docs <https://www.tensorflow.org/deploy/distributed>`__.
To enable parameter server training:
Expand All @@ -229,7 +229,7 @@ To enable parameter server training:
tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
train_instance_count=2, train_instance_type='ml.p2.xlarge',
framework_version='1.11', py_version='py3',
distributions={'parameter_server': {'enabled': True}})
distribution={'parameter_server': {'enabled': True}})
tf_estimator.fit('s3://bucket/path/to/training/data')

Training with Horovod
Expand All @@ -241,7 +241,7 @@ You can find more details at `Horovod README <https://github.com/uber/horovod>`_
The container sets up the MPI environment and executes the ``mpirun`` command, enabling you to run any Horovod
training script.

Training with ``MPI`` is configured by specifying following fields in ``distributions``:
Training with ``MPI`` is configured by specifying following fields in ``distribution``:

- ``enabled (bool)``: If set to ``True``, the MPI setup is performed and ``mpirun`` command is executed.
- ``processes_per_host (int)``: Number of processes MPI should launch on each host. Note, this should not be
Expand All @@ -260,7 +260,7 @@ In the below example we create an estimator to launch Horovod distributed traini
tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
train_instance_count=1, train_instance_type='ml.p3.8xlarge',
framework_version='2.1.0', py_version='py3',
distributions={
distribution={
'mpi': {
'enabled': True,
'processes_per_host': 4,
Expand Down
1 change: 1 addition & 0 deletions src/sagemaker/cli/compatibility/v2/ast_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
modifiers.tfs.TensorFlowServingConstructorRenamer(),
modifiers.predictors.PredictorConstructorRefactor(),
modifiers.airflow.ModelConfigArgModifier(),
modifiers.estimators.DistributionParameterRenamer(),
]

IMPORT_MODIFIERS = [modifiers.tfs.TensorFlowServingImportRenamer()]
Expand Down
1 change: 1 addition & 0 deletions src/sagemaker/cli/compatibility/v2/modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from sagemaker.cli.compatibility.v2.modifiers import ( # noqa: F401 (imported but unused)
airflow,
deprecated_params,
estimators,
framework_version,
predictors,
tf_legacy_mode,
Expand Down
72 changes: 72 additions & 0 deletions src/sagemaker/cli/compatibility/v2/modifiers/estimators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Classes to modify Predictor code to be compatible
with version 2.0 and later of the SageMaker Python SDK.
"""
from __future__ import absolute_import

from sagemaker.cli.compatibility.v2.modifiers import matching
from sagemaker.cli.compatibility.v2.modifiers.modifier import Modifier

ESTIMATORS_WITH_DISTRIBUTION_PARAM = {
"TensorFlow": ("sagemaker.tensorflow", "sagemaker.tensorflow.estimator"),
"MXNet": ("sagemaker.mxnet", "sagemaker.mxnet.estimator"),
}


class DistributionParameterRenamer(Modifier):
"""A class to rename the ``distributions`` attribute in MXNet and TensorFlow estimators."""

def node_should_be_modified(self, node):
"""Checks if the ``ast.Call`` node instantiates an MXNet or TensorFlow estimator and
contains the ``distributions`` parameter.

This looks for the following calls:

- ``<Framework>``
- ``sagemaker.<framework>.<Framework>``
- ``sagemaker.<framework>.estimator.<Framework>``

where ``<Framework>`` is either ``TensorFlow`` or ``MXNet``.

Args:
node (ast.Call): a node that represents a function call. For more,
see https://docs.python.org/3/library/ast.html#abstract-grammar.

Returns:
bool: If the ``ast.Call`` instantiates an MXNet or TensorFlow estimator with
the ``distributions`` parameter.
"""
return matching.matches_any(
node, ESTIMATORS_WITH_DISTRIBUTION_PARAM
) and self._has_distribution_arg(node)

def _has_distribution_arg(self, node):
"""Checks if the node has the ``distributions`` parameter in its keywords."""
for kw in node.keywords:
if kw.arg == "distributions":
return True

return False

def modify_node(self, node):
"""Modifies the ``ast.Call`` node to rename the ``distributions`` attribute to
``distribution``.

Args:
node (ast.Call): a node that represents an MXNet or TensorFlow constructor.
"""
for kw in node.keywords:
if kw.arg == "distributions":
Comment on lines +69 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ignorable style: it may be overkill to abstract functionality from _has_distribution_arg and modify_node, something like:

def _distribution_arg(self, node):
    for kw in node.keywords:
        if kw.arg == "distributions":
            return kw
    return None

def _has_distribution_arg(self, node):
    return self._distribution_arg(node) is not None

def modify_node(self, node):
    kw = self._distribution_arg(node)
    kw.arg == "distribution"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...it kind of feels like overkill in this context, but I could see potential in the arg retrieval as a standalone utility function that's then used with multiple modifier classes

kw.arg = "distribution"
break
8 changes: 4 additions & 4 deletions src/sagemaker/fw_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,15 +599,15 @@ def later_framework_version_warning(latest_version):
return LATER_FRAMEWORK_VERSION_WARNING.format(latest=latest_version)


def warn_if_parameter_server_with_multi_gpu(training_instance_type, distributions):
def warn_if_parameter_server_with_multi_gpu(training_instance_type, distribution):
"""Warn the user that training will not fully leverage all the GPU
cores if parameter server is enabled and a multi-GPU instance is selected.
Distributed training with the default parameter server setup doesn't
support multi-GPU instances.

Args:
training_instance_type (str): A string representing the type of training instance selected.
distributions (dict): A dictionary with information to enable distributed training.
distribution (dict): A dictionary with information to enable distributed training.
(Defaults to None if distributed training is not enabled.) For example:

.. code:: python
Expand All @@ -621,15 +621,15 @@ def warn_if_parameter_server_with_multi_gpu(training_instance_type, distribution


"""
if training_instance_type == "local" or distributions is None:
if training_instance_type == "local" or distribution is None:
return

is_multi_gpu_instance = (
training_instance_type == "local_gpu"
or training_instance_type.split(".")[1].startswith("p")
) and training_instance_type not in SINGLE_GPU_INSTANCE_TYPES

ps_enabled = "parameter_server" in distributions and distributions["parameter_server"].get(
ps_enabled = "parameter_server" in distribution and distribution["parameter_server"].get(
"enabled", False
)

Expand Down
24 changes: 11 additions & 13 deletions src/sagemaker/mxnet/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
framework_version_from_tag,
is_version_equal_or_higher,
python_deprecation_warning,
parameter_v2_rename_warning,
validate_version_or_image_args,
warn_if_parameter_server_with_multi_gpu,
)
Expand All @@ -46,7 +45,7 @@ def __init__(
source_dir=None,
hyperparameters=None,
image_name=None,
distributions=None,
distribution=None,
**kwargs
):
"""This ``Estimator`` executes an MXNet script in a managed MXNet
Expand Down Expand Up @@ -100,7 +99,7 @@ def __init__(
If ``framework_version`` or ``py_version`` are ``None``, then
``image_name`` is required. If also ``None``, then a ``ValueError``
will be raised.
distributions (dict): A dictionary with information on how to run distributed
distribution (dict): A dictionary with information on how to run distributed
training (default: None). To have parameter servers launched for training,
set this value to be ``{'parameter_server': {'enabled': True}}``.
**kwargs: Additional kwargs passed to the
Expand Down Expand Up @@ -131,35 +130,34 @@ def __init__(
entry_point, source_dir, hyperparameters, image_name=image_name, **kwargs
)

if distributions is not None:
logger.warning(parameter_v2_rename_warning("distributions", "distribution"))
if distribution is not None:
train_instance_type = kwargs.get("train_instance_type")
warn_if_parameter_server_with_multi_gpu(
training_instance_type=train_instance_type, distributions=distributions
training_instance_type=train_instance_type, distribution=distribution
)

self._configure_distribution(distributions)
self._configure_distribution(distribution)

def _configure_distribution(self, distributions):
def _configure_distribution(self, distribution):
"""
Args:
distributions:
distribution:
"""
if distributions is None:
if distribution is None:
return

if (
self.framework_version
and self.framework_version.split(".") < self._LOWEST_SCRIPT_MODE_VERSION
):
raise ValueError(
"The distributions option is valid for only versions {} and higher".format(
"The distribution option is valid for only versions {} and higher".format(
".".join(self._LOWEST_SCRIPT_MODE_VERSION)
)
)

if "parameter_server" in distributions:
enabled = distributions["parameter_server"].get("enabled", False)
if "parameter_server" in distribution:
enabled = distribution["parameter_server"].get("enabled", False)
self._hyperparameters[self.LAUNCH_PS_ENV_NAME] = enabled

def create_model(
Expand Down
21 changes: 10 additions & 11 deletions src/sagemaker/tensorflow/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(
framework_version=None,
model_dir=None,
image_name=None,
distributions=None,
distribution=None,
**kwargs
):
"""Initialize a ``TensorFlow`` estimator.
Expand Down Expand Up @@ -81,7 +81,7 @@ def __init__(
If ``framework_version`` or ``py_version`` are ``None``, then
``image_name`` is required. If also ``None``, then a ``ValueError``
will be raised.
distributions (dict): A dictionary with information on how to run distributed training
distribution (dict): A dictionary with information on how to run distributed training
(default: None). Currently we support distributed training with parameter servers
and MPI.
To enable parameter server use the following setup:
Expand Down Expand Up @@ -122,11 +122,10 @@ def __init__(
self.framework_version = framework_version
self.py_version = py_version

if distributions is not None:
logger.warning(fw.parameter_v2_rename_warning("distribution", distributions))
if distribution is not None:
train_instance_type = kwargs.get("train_instance_type")
fw.warn_if_parameter_server_with_multi_gpu(
training_instance_type=train_instance_type, distributions=distributions
training_instance_type=train_instance_type, distribution=distribution
)

if "enable_sagemaker_metrics" not in kwargs:
Expand All @@ -137,7 +136,7 @@ def __init__(
super(TensorFlow, self).__init__(image_name=image_name, **kwargs)

self.model_dir = model_dir
self.distributions = distributions or {}
self.distribution = distribution or {}

self._validate_args(py_version=py_version)

Expand Down Expand Up @@ -295,13 +294,13 @@ def hyperparameters(self):
hyperparameters = super(TensorFlow, self).hyperparameters()
additional_hyperparameters = {}

if "parameter_server" in self.distributions:
ps_enabled = self.distributions["parameter_server"].get("enabled", False)
if "parameter_server" in self.distribution:
ps_enabled = self.distribution["parameter_server"].get("enabled", False)
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = ps_enabled

mpi_enabled = False
if "mpi" in self.distributions:
mpi_dict = self.distributions["mpi"]
if "mpi" in self.distribution:
mpi_dict = self.distribution["mpi"]
mpi_enabled = mpi_dict.get("enabled", False)
additional_hyperparameters[self.LAUNCH_MPI_ENV_NAME] = mpi_enabled

Expand Down Expand Up @@ -338,7 +337,7 @@ def _validate_and_set_debugger_configs(self):

Else, set default HookConfig
"""
ps_enabled = "parameter_server" in self.distributions and self.distributions[
ps_enabled = "parameter_server" in self.distribution and self.distribution[
"parameter_server"
].get("enabled", False)
if ps_enabled:
Expand Down
4 changes: 2 additions & 2 deletions tests/integ/test_horovod.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def test_horovod_local_mode(
output_path=output_path,
framework_version=tf_training_latest_version,
py_version=tf_training_latest_py_version,
distributions={"mpi": {"enabled": True, "processes_per_host": processes}},
distribution={"mpi": {"enabled": True, "processes_per_host": processes}},
)

with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
Expand Down Expand Up @@ -128,7 +128,7 @@ def _create_and_fit_estimator(sagemaker_session, tf_version, py_version, instanc
sagemaker_session=sagemaker_session,
py_version=py_version,
framework_version=tf_version,
distributions={"mpi": {"enabled": True}},
distribution={"mpi": {"enabled": True}},
)

with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/test_local_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_mxnet_distributed_local_mode(
train_instance_type="local",
sagemaker_session=sagemaker_local_session,
framework_version=mxnet_full_version,
distributions={"parameter_server": {"enabled": True}},
distribution={"parameter_server": {"enabled": True}},
)

train_input = mx.sagemaker_session.upload_data(
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/test_mxnet_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def test_async_fit(sagemaker_session, mxnet_full_version, mxnet_full_py_version,
train_instance_type=cpu_instance_type,
sagemaker_session=sagemaker_session,
framework_version=mxnet_full_version,
distributions={"parameter_server": {"enabled": True}},
distribution={"parameter_server": {"enabled": True}},
)

train_input = mx.sagemaker_session.upload_data(
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_mnist_distributed(
sagemaker_session=sagemaker_session,
framework_version=tf_training_latest_version,
py_version=tf_training_latest_py_version,
distributions=PARAMETER_SERVER_DISTRIBUTION,
distribution=PARAMETER_SERVER_DISTRIBUTION,
)
inputs = estimator.sagemaker_session.upload_data(
path=os.path.join(MNIST_RESOURCE_PATH, "data"), key_prefix="scriptmode/distributed_mnist"
Expand Down
Loading