Skip to content

Support of Horovod and TF 1.12 for TensorFlow Script Mode. TFS 1.12 support #567

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 69 commits into from
Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
1eb85ad
Add horovod support
icywang86rui Dec 6, 2018
c63fe06
Add newline at eof
icywang86rui Dec 6, 2018
b91208a
Do not skip integ test
icywang86rui Dec 6, 2018
a1b426a
Edit README to include distributed training with MPI
icywang86rui Dec 6, 2018
10fe7bf
PR commentsw
icywang86rui Dec 7, 2018
c5f68b1
Add processes_per_host and custom_mpi_options
icywang86rui Dec 7, 2018
858079e
Add missing period
icywang86rui Dec 7, 2018
ffc0812
Use distribution in README
icywang86rui Dec 7, 2018
7587e52
Use distributions in README
icywang86rui Dec 7, 2018
f1f8583
Fix README
icywang86rui Dec 7, 2018
64449a5
Imporve documentation
yangaws Dec 17, 2018
c857afe
Address comments from Eric
yangaws Dec 18, 2018
245b75f
Merge remote-tracking branch 'origin/master' into horovod
mvsusp Dec 18, 2018
e3aeb6e
Updated TF version
mvsusp Dec 18, 2018
2bcd290
Fix empty mpi distribution use case
mvsusp Dec 18, 2018
3cfdc57
Add check for necessary files in model.tar.gz
yangaws Dec 19, 2018
561414f
Add benchmarks as submodule
mvsusp Dec 19, 2018
b5d4a1c
Add benchmarks as submodule
mvsusp Dec 19, 2018
7843392
Handle PR comments
mvsusp Dec 19, 2018
1c4e8c5
Update version
mvsusp Dec 19, 2018
41175a2
Handle PR comments
mvsusp Dec 20, 2018
c137be1
Run TF tests against latest container instead of default.
nadiaya Dec 20, 2018
d44d590
Merge branch 'wru-horovod' of github.com:mvsusp/sagemaker-python-sdk …
nadiaya Dec 20, 2018
05ee7c1
Merge branch 'master' into wru-horovod
yangaws Dec 20, 2018
1680073
Fix urllib.parse import errors for python 2.
nadiaya Dec 20, 2018
0232e4a
Merge branch 'wru-horovod' of github.com:mvsusp/sagemaker-python-sdk …
nadiaya Dec 20, 2018
c19021a
Fix horovod integ test tar file extract error
yangaws Dec 20, 2018
6722c07
Merge branch 'master' into wru-horovod
yangaws Dec 20, 2018
24a5d61
fix flake8
yangaws Dec 20, 2018
160e646
Removed unnecessary tests
mvsusp Dec 21, 2018
7f93812
Merge branch 'master' into wru-horovod
uditbhatia Jan 10, 2019
333ebf7
Removing duplicated/unused TF import
uditbhatia Jan 10, 2019
1f80caf
Merge branch 'master' into wru-horovod
uditbhatia Jan 10, 2019
a1ec1b4
Add horovod support
icywang86rui Dec 6, 2018
9e8d88a
Add newline at eof
icywang86rui Dec 6, 2018
fafc9bb
Do not skip integ test
icywang86rui Dec 6, 2018
df313d8
Edit README to include distributed training with MPI
icywang86rui Dec 6, 2018
3fd1bf0
PR commentsw
icywang86rui Dec 7, 2018
e3051da
Add processes_per_host and custom_mpi_options
icywang86rui Dec 7, 2018
ead6229
Add missing period
icywang86rui Dec 7, 2018
d41e163
Use distribution in README
icywang86rui Dec 7, 2018
2aff9fc
Use distributions in README
icywang86rui Dec 7, 2018
3915406
Fix README
icywang86rui Dec 7, 2018
a07c0d6
Imporve documentation
yangaws Dec 17, 2018
308a31c
Address comments from Eric
yangaws Dec 18, 2018
56d6d07
Updated TF version
mvsusp Dec 18, 2018
3145ffd
Fix empty mpi distribution use case
mvsusp Dec 18, 2018
dd838ef
Add check for necessary files in model.tar.gz
yangaws Dec 19, 2018
15bfe00
Add benchmarks as submodule
mvsusp Dec 19, 2018
8e9734e
Add benchmarks as submodule
mvsusp Dec 19, 2018
b22671d
Handle PR comments
mvsusp Dec 19, 2018
20e906e
Update version
mvsusp Dec 19, 2018
430cd0a
Handle PR comments
mvsusp Dec 20, 2018
bd9c92d
Run TF tests against latest container instead of default.
nadiaya Dec 20, 2018
2fcdaea
Fix urllib.parse import errors for python 2.
nadiaya Dec 20, 2018
3d06e11
Fix horovod integ test tar file extract error
yangaws Dec 20, 2018
c78eb31
fix flake8
yangaws Dec 20, 2018
abce1dd
Removed unnecessary tests
mvsusp Dec 21, 2018
3342e94
Removing duplicated/unused TF import
uditbhatia Jan 10, 2019
cb7610f
Capitalizing the mpi_distribution ps_distribution constant
uditbhatia Jan 10, 2019
dca2173
resolving conflists
uditbhatia Jan 10, 2019
f91b29c
Merge branch 'master' into wru-horovod
uditbhatia Jan 11, 2019
30995dd
Restoring version default to 1.12
uditbhatia Jan 11, 2019
7c93fdc
Accomodating the mvs pr comments
uditbhatia Jan 11, 2019
a8d2cb0
Updating changelog
uditbhatia Jan 11, 2019
55c1998
chaing the TF_VERSION field to 1.11 from 1.12 in defaults.py
uditbhatia Jan 11, 2019
c913c40
Merge branch 'master' into wru-horovod
uditbhatia Jan 16, 2019
23d8074
Fixing flake 8 errors after merge from master and updating changelog
uditbhatia Jan 16, 2019
177f37f
Bumping up the python SDK version to 1.17.3 (as per instructions in M…
uditbhatia Jan 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
CHANGELOG
=========

1.17.3.dev
==========

1.17.3
======

* bug-fix: Handle StopIteration in CloudWatch Logs retrieval
* feature: Update EI TensorFlow latest version to 1.12
* feature: Support for Horovod

1.17.2
======
Expand All @@ -19,10 +21,15 @@ CHANGELOG
* enhancement: Workflow: Specify tasks from which training/tuning operator to transform/deploy in related operators
* feature: Supporting inter-container traffic encryption flag


1.17.0
======


* bug-fix: Workflow: Revert appending Airflow retry id to default job name
* feature: support for Tensorflow 1.12
* feature: support for Tensorflow Serving 1.12
* bug-fix: Revert appending Airflow retry id to default job name
* bug-fix: Session: don't allow get_execution_role() to return an ARN that's not a role but has "role" in the name
* bug-fix: Remove ``__all__`` from ``__init__.py`` files
* doc-fix: Add TFRecord split type to docs
Expand All @@ -35,6 +42,7 @@ CHANGELOG
* enhancement: Add Model.transformer()
* bug-fix: HyperparameterTuner: make ``include_cls_metadata`` default to ``False`` for everything except Frameworks


1.16.3
======

Expand Down
2 changes: 1 addition & 1 deletion src/sagemaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@
from sagemaker.session import s3_input # noqa: F401
from sagemaker.session import get_execution_role # noqa: F401

__version__ = '1.17.2'
__version__ = '1.17.3'
3 changes: 3 additions & 0 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,9 @@ class Framework(EstimatorBase):

__framework_name__ = None
LAUNCH_PS_ENV_NAME = 'sagemaker_parameter_server_enabled'
LAUNCH_MPI_ENV_NAME = 'sagemaker_mpi_enabled'
MPI_NUM_PROCESSES_PER_HOST = 'sagemaker_mpi_num_of_processes_per_host'
MPI_CUSTOM_MPI_OPTIONS = 'sagemaker_mpi_custom_mpi_options'

def __init__(self, entry_point, source_dir=None, hyperparameters=None, enable_cloudwatch_metrics=False,
container_log_level=logging.INFO, code_location=None, image_name=None, dependencies=None, **kwargs):
Expand Down
72 changes: 66 additions & 6 deletions src/sagemaker/tensorflow/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,67 @@ After attaching, the estimator can be deployed as usual.

tf_estimator = TensorFlow.attach(training_job_name=training_job_name)

Distributed Training
''''''''''''''''''''

To run your training job with multiple instances in a distributed fashion, set ``train_instance_count``
to a number larger than 1. We support two different types of distributed training, parameter server and Horovod.
The ``distributions`` parameter is used to configure which distributed training strategy to use.

Training with parameter servers
"""""""""""""""""""""""""""""""

If you specify parameter_server as the value of the distributions parameter, the container launches a parameter server
thread on each instance in the training cluster, and then executes your training code. You can find more information on
TensorFlow distributed training at `TensorFlow docs <https://www.tensorflow.org/deploy/distributed>`__.
To enable parameter server training:

.. code:: python

from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
train_instance_count=2, train_instance_type='ml.p2.xlarge',
framework_version='1.11', py_version='py3',
distributions={'parameter_server': {'enabled': True}})
tf_estimator.fit('s3://bucket/path/to/training/data')

Training with Horovod
"""""""""""""""""""""

Horovod is a distributed training framework based on MPI. You can find more details at `Horovod README <https://github.com/uber/horovod>`__.

The container sets up the MPI environment and executes the ``mpirun`` command enabling you to run any Horovod
training script with Script Mode.

Training with ``MPI`` is configured by specifying following fields in ``distributions``:

- ``enabled (bool)``: If set to ``True``, the MPI setup is performed and ``mpirun`` command is executed.
- ``processes_per_host (int)``: Number of processes MPI should launch on each host. Note, this should not be
greater than the available slots on the selected instance type. This flag should be set for the multi-cpu/gpu
training.
- ``custom_mpi_options (str)``: Any `mpirun` flag(s) can be passed in this field that will be added to the `mpirun`
command executed by SageMaker to launch distributed horovod training.


In the below example we create an estimator to launch Horovod distributed training with 2 processes on one host:

.. code:: python

from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
train_instance_count=1, train_instance_type='ml.p2.xlarge',
framework_version='1.12', py_version='py3',
distributions={
'mpi': {
'enabled': True,
'processes_per_host': 2,
'custom_mpi_options': '--NCCL_DEBUG INFO'
}
})
tf_estimator.fit('s3://bucket/path/to/training/data')

sagemaker.tensorflow.TensorFlow class
'''''''''''''''''''''''''''''''''''''

Expand Down Expand Up @@ -277,11 +338,10 @@ Optional:
- ``model_dir (str)`` Location where model data, checkpoint data, and TensorBoard checkpoints should be saved during training.
If not specified a S3 location will be generated under the training job's default bucket. And ``model_dir`` will be
passed in your training script as one of the command line arguments.
- ``distributions (dict)`` Configure your distrubtion strategy with this argument. For launching parameter server for
for distributed training, you must set ``distributions`` to ``{'parameter_server': {'enabled': True}}``
- ``distributions (dict)`` Configure your distribution strategy with this argument.

Training with Pipe Mode using PipeModeDataset
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Amazon SageMaker allows users to create training jobs using Pipe input mode.
With Pipe input mode, your dataset is streamed directly to your training instances instead of being downloaded first.
Expand Down Expand Up @@ -327,9 +387,9 @@ To run training job with Pipe input mode, pass in ``input_mode='Pipe'`` to your
from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train-with-pipemodedataset.py', role='SageMakerRole',
training_steps=10000, evaluation_steps=100,
train_instance_count=1, train_instance_type='ml.p2.xlarge',
framework_version='1.10.0', input_mode='Pipe')
training_steps=10000, evaluation_steps=100,
train_instance_count=1, train_instance_type='ml.p2.xlarge',
framework_version='1.10.0', input_mode='Pipe')

tf_estimator.fit('s3://bucket/path/to/training/data')

Expand Down
35 changes: 27 additions & 8 deletions src/sagemaker/tensorflow/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,20 @@ def __init__(self, training_steps=None, evaluation_steps=None, checkpoint_path=N
script_mode (bool): If set to True will the estimator will use the Script Mode containers (default: False).
This will be ignored if py_version is set to 'py3'.
distributions (dict): A dictionary with information on how to run distributed training
(default: None). Currently we only support distributed training with parameter servers. To enable it
use the following setup:
{
(default: None). Currently we support distributed training with parameter servers and MPI. To enable
parameter server use the following setup:
'parameter_server':
{
'enabled': True
}
}
To enable MPI:
{
'mpi':
{
'enabled': True
}
}
**kwargs: Additional kwargs passed to the Framework constructor.
"""
if framework_version is None:
Expand Down Expand Up @@ -420,13 +426,24 @@ def hyperparameters(self):
hyperparameters = super(TensorFlow, self).hyperparameters()

self.checkpoint_path = self.checkpoint_path or self._default_s3_path('checkpoints')
mpi_enabled = False

if self._script_mode_enabled():
self.model_dir = self.model_dir or self._default_s3_path('model')
additional_hyperparameters = {'model_dir': self.model_dir}
additional_hyperparameters = {}

if 'parameter_server' in self.distributions:
enabled = self.distributions['parameter_server'].get('enabled', False)
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = enabled
ps_enabled = self.distributions['parameter_server'].get('enabled', False)
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = ps_enabled

if 'mpi' in self.distributions:
mpi_dict = self.distributions['mpi']
mpi_enabled = mpi_dict.get('enabled', False)
additional_hyperparameters[self.LAUNCH_MPI_ENV_NAME] = mpi_enabled
additional_hyperparameters[self.MPI_NUM_PROCESSES_PER_HOST] = mpi_dict.get('processes_per_host', 1)
additional_hyperparameters[self.MPI_CUSTOM_MPI_OPTIONS] = mpi_dict.get('custom_mpi_options', '')

self.model_dir = self.model_dir or self._default_s3_path('model', mpi=mpi_enabled)
additional_hyperparameters['model_dir'] = self.model_dir
else:
additional_hyperparameters = {'checkpoint_path': self.checkpoint_path,
'training_steps': self.training_steps,
Expand All @@ -436,10 +453,12 @@ def hyperparameters(self):
hyperparameters.update(Framework._json_encode_hyperparameters(additional_hyperparameters))
return hyperparameters

def _default_s3_path(self, directory):
def _default_s3_path(self, directory, mpi=False):
local_code = get_config_value('local.local_code', self.sagemaker_session.config)
if self.sagemaker_session.local_mode and local_code:
return '/opt/ml/shared/{}'.format(directory)
elif mpi:
return '/opt/ml/model'
else:
return os.path.join(self.output_path, self._current_job_name, directory)

Expand Down
3 changes: 3 additions & 0 deletions tests/data/horovod/launcher.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/usr/bin/env bash

python benchmarks/scripts/tf_cnn_benchmarks/tf_cnn_benchmarks.py --num_batches=500 --model vgg16 --variable_update horovod --horovod_device gpu --use_fp16 --summary_verbosity 1 --save_summaries_steps 10 --train_dir /opt/ml/model --eval_dir /opt/ml/model --batch_size 32
11 changes: 11 additions & 0 deletions tests/data/horovod/test_hvd_basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import json
import os
import horovod.tensorflow as hvd

hvd.init()

with open(os.path.join('/opt/ml/model/rank-%s' % hvd.rank()), 'w+') as f:
basic_info = {'rank': hvd.rank(), 'size': hvd.size()}

print(basic_info)
json.dump(basic_info, f)
103 changes: 103 additions & 0 deletions tests/integ/test_horovod.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import json
import os
import tarfile
from six.moves.urllib.parse import urlparse

import boto3
import pytest

import tests.integ as integ
from sagemaker.tensorflow import TensorFlow
from tests.integ import timeout

horovod_dir = os.path.join(os.path.dirname(__file__), '..', 'data', 'horovod')


@pytest.mark.parametrize('instance_type', ['ml.c5.xlarge', 'ml.p3.2xlarge'])
def test_horovod(sagemaker_session, instance_type, tmpdir):

estimator = TensorFlow(entry_point=os.path.join(horovod_dir, 'test_hvd_basic.py'),
role='SageMakerRole',
train_instance_count=2,
train_instance_type=instance_type,
sagemaker_session=sagemaker_session,
py_version=integ.PYTHON_VERSION,
script_mode=True,
framework_version='1.12',
distributions={'mpi': {'enabled': True}},
base_job_name='test-tf-horovod')

with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
estimator.fit()

tmp = str(tmpdir)
extract_files_from_s3(estimator.model_data, tmp)

for rank in range(2):
assert read_json('rank-%s' % rank, tmp)['rank'] == rank


@pytest.mark.parametrize('instances, processes', [
[1, 2],
(2, 1),
(2, 2)])
def test_horovod_local_mode(instances, processes, tmpdir):
output_path = 'file://%s' % tmpdir

estimator = TensorFlow(entry_point=os.path.join(horovod_dir, 'test_hvd_basic.py'),
role='SageMakerRole',
train_instance_count=2,
train_instance_type='local',
py_version=integ.PYTHON_VERSION,
script_mode=True,
output_path=output_path,
framework_version='1.12',
distributions={'mpi': {'enabled': True,
'processes_per_host': processes}},
base_job_name='test-tf-horovod')

with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
estimator.fit()

tmp = str(tmpdir)
extract_files(output_path.replace('file://', ''), tmp)

size = instances * processes

for rank in range(size):
assert read_json('rank-%s' % rank, tmp)['rank'] == rank


def extract_files(output_path, tmpdir):
with tarfile.open(os.path.join(output_path, 'model.tar.gz')) as tar:
tar.extractall(tmpdir)


def read_json(file, tmp):
with open(os.path.join(tmp, file)) as f:
return json.load(f)


def extract_files_from_s3(s3_url, tmpdir):
parsed_url = urlparse(s3_url)
s3 = boto3.resource('s3')

model = os.path.join(tmpdir, 'model')
s3.Bucket(parsed_url.netloc).download_file(parsed_url.path.lstrip('/'), model)

with tarfile.open(model, 'r') as tar_file:
tar_file.extractall(tmpdir)
5 changes: 3 additions & 2 deletions tests/integ/test_tf_script_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

RESOURCE_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'tensorflow_mnist')
SCRIPT = os.path.join(RESOURCE_PATH, 'mnist.py')
DISTRIBUTION_ENABLED = {'parameter_server': {'enabled': True}}
PARAMETER_SERVER_DISTRIBUTION = {'parameter_server': {'enabled': True}}
MPI_DISTRIBUTION = {'mpi': {'enabled': True}}


@pytest.fixture(scope='session', params=['ml.c5.xlarge', 'ml.p2.xlarge'])
Expand Down Expand Up @@ -62,7 +63,7 @@ def test_mnist_distributed(sagemaker_session, instance_type):
py_version=integ.PYTHON_VERSION,
script_mode=True,
framework_version='1.11',
distributions=DISTRIBUTION_ENABLED,
distributions=PARAMETER_SERVER_DISTRIBUTION,
base_job_name='test-tf-sm-mnist')
inputs = estimator.sagemaker_session.upload_data(
path=os.path.join(RESOURCE_PATH, 'data'),
Expand Down
Loading