Skip to content

[DO NOT MERGE] Enable distributed training with Horovod for TensorFlow Script Mode #529

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,7 @@ class Framework(EstimatorBase):

__framework_name__ = None
LAUNCH_PS_ENV_NAME = 'sagemaker_parameter_server_enabled'
LAUNCH_MPI_ENV_NAME = 'sagemaker_mpi_enabled'

def __init__(self, entry_point, source_dir=None, hyperparameters=None, enable_cloudwatch_metrics=False,
container_log_level=logging.INFO, code_location=None, image_name=None, dependencies=None, **kwargs):
Expand Down
40 changes: 38 additions & 2 deletions src/sagemaker/tensorflow/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,43 @@ After attaching, the estimator can be deployed as usual.

tf_estimator = TensorFlow.attach(training_job_name=training_job_name)

Distributed Training
''''''''''''''''''''

To run your training job in a distributed fashion you need to set ``train_instance_count`` to a number larger than 1.
We support two different types of distributed training, parameter server and MPI. The ``distributions`` parameter is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We support more than these two distributed training types. I guess the difference is that these two types require additional setup.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible for user to run other types of distributed training. But I wouldn't say those are supported. These two types are setup by our code and we are going to support and maintain that code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to provide details about the 2 configurations options custom_mpi_options and processes_per_host, here is the draft that I wrote:

Please see distribution and Training with Horovod sections of https://github.com/uditbhatia/sagemaker-python-sdk/blob/horovod-documentation/src/sagemaker/tensorflow/README.rst

PLease note couple of links are broken as it is still a draft. But I hope this helps you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re: Marcio's initial concern - I think this would read better if the "supporting two types of distributed training" were attributed to distributions rather than "us" (aka SageMaker). So maybe change this to:

To run your training job in a distributed fashion you need to set train_instance_count to a number larger than 1. In addition, you will need to ensure that the correct processes are started during training. You can either do this yourself or use the distributions parameter.

The distributions parameter can be used for:

  • launching parameter server: blah blah blah explanation
  • using MPI: other explanation blah blah blah

used to configure which distributed training strategy to use.

To enable parameter server training:

.. code:: python

from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
train_instance_count=2, train_instance_type='ml.p2.xlarge',
framework_version='1.11', py_version='py3',
distributions={'parameter_server': {'enabled': True}})
tf_estimator.fit('s3://bucket/path/to/training/data')

If parameter server is enabled, the container will launch a parameter server thread in each instance first then execute
your training code. You can find more information on TensorFlow distributed training `here <https://www.tensorflow.org/deploy/distributed>`__

To enable MPI training:

.. code:: python

from sagemaker.tensorflow import TensorFlow

tf_estimator = TensorFlow(entry_point='tf-train.py', role='SageMakerRole',
train_instance_count=2, train_instance_type='ml.p2.xlarge',
framework_version='1.11', py_version='py3',
distributions={'mpi': {'enabled': True}})
tf_estimator.fit('s3://bucket/path/to/training/data')

If MPI is enabled the container will configure and execute ``mpirun`` with your training script. You can find
more information on MPI and Horovod `here <https://github.com/uber/horovod>`__

sagemaker.tensorflow.TensorFlow class
'''''''''''''''''''''''''''''''''''''

Expand Down Expand Up @@ -277,8 +314,7 @@ Optional:
- ``model_dir (str)`` Location where model data, checkpoint data, and TensorBoard checkpoints should be saved during training.
If not specified a S3 location will be generated under the training job's default bucket. And ``model_dir`` will be
passed in your training script as one of the command line arguments.
- ``distributions (dict)`` Configure your distrubtion strategy with this argument. For launching parameter server for
for distributed training, you must set ``distributions`` to ``{'parameter_server': {'enabled': True}}``
- ``distributions (dict)`` Configure your distribution strategy with this argument.

Training with Pipe Mode using PipeModeDataset
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
33 changes: 25 additions & 8 deletions src/sagemaker/tensorflow/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,15 +199,22 @@ def __init__(self, training_steps=None, evaluation_steps=None, checkpoint_path=N
custom-image:latest.
script_mode (bool): If set to True will the estimator will use the Script Mode containers (default: False).
This will be ignored if py_version is set to 'py3'.
distribution (dict): A dictionary with information on how to run distributed training
(default: None). Currently we only support distributed training with parameter servers. To enable it
use the following setup:
distributions (dict): A dictionary with information on how to run distributed training
(default: None). Currently we support distributed training with parameter servers and MPI. To enable
parameter server use the following setup:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/server/servers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for "To enable parameter server" - s/server/servers

{
'parameter_server':
{
'enabled': True
}
}
To enable MPI:
{
'mpi':
{
'enabled': True
}
}
**kwargs: Additional kwargs passed to the Framework constructor.
"""
if framework_version is None:
Expand Down Expand Up @@ -421,11 +428,19 @@ def hyperparameters(self):
self.checkpoint_path = self.checkpoint_path or self._default_s3_path('checkpoints')

if self._script_mode_enabled():
self.model_dir = self.model_dir or self._default_s3_path('model')
additional_hyperparameters = {'model_dir': self.model_dir}
additional_hyperparameters = {}

if 'parameter_server' in self.distributions:
enabled = self.distributions['parameter_server'].get('enabled', False)
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = enabled
ps_enabled = self.distributions['parameter_server'].get('enabled', False)
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = ps_enabled

mpi_enabled = False
if 'mpi' in self.distributions:
mpi_enabled = self.distributions['mpi'].get('enabled', False)
additional_hyperparameters[self.LAUNCH_MPI_ENV_NAME] = mpi_enabled

self.model_dir = self.model_dir or self._default_s3_path('model', mpi=mpi_enabled)
additional_hyperparameters['model_dir'] = self.model_dir
else:
additional_hyperparameters = {'checkpoint_path': self.checkpoint_path,
'training_steps': self.training_steps,
Expand All @@ -435,10 +450,12 @@ def hyperparameters(self):
hyperparameters.update(Framework._json_encode_hyperparameters(additional_hyperparameters))
return hyperparameters

def _default_s3_path(self, directory):
def _default_s3_path(self, directory, mpi=False):
local_code = get_config_value('local.local_code', self.sagemaker_session.config)
if self.sagemaker_session.local_mode and local_code:
return '/opt/ml/shared/{}'.format(directory)
elif mpi:
return '/opt/ml/model'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we make this a constant?

else:
return os.path.join(self.output_path, self._current_job_name, directory)

Expand Down
135 changes: 135 additions & 0 deletions tests/data/tensorflow_mnist/horovod_mnist.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import argparse
import os

import tensorflow as tf
import horovod.tensorflow as hvd

layers = tf.contrib.layers
learn = tf.contrib.learn

tf.logging.set_verbosity(tf.logging.INFO)


def _parse_args():
parser = argparse.ArgumentParser()
# Data, model, and output directories
parser.add_argument('--output-data-dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR'))
parser.add_argument('--model_dir', type=str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it's strange to me that we would mix underscores and hyphens in our examples like this


return parser.parse_known_args()


def conv_model(feature, target, mode):
"""2-layer convolution model."""
# Convert the target to a one-hot tensor of shape (batch_size, 10) and
# with a on-value of 1 for each one-hot vector of length 10.
target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0)

# Reshape feature to 4d tensor with 2nd and 3rd dimensions being
# image width and height final dimension being the number of color channels.
feature = tf.reshape(feature, [-1, 28, 28, 1])

# First conv layer will compute 32 features for each 5x5 patch
with tf.variable_scope('conv_layer1'):
h_conv1 = layers.conv2d(
feature, 32, kernel_size=[5, 5], activation_fn=tf.nn.relu)
h_pool1 = tf.nn.max_pool(
h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')

# Second conv layer will compute 64 features for each 5x5 patch.
with tf.variable_scope('conv_layer2'):
h_conv2 = layers.conv2d(
h_pool1, 64, kernel_size=[5, 5], activation_fn=tf.nn.relu)
h_pool2 = tf.nn.max_pool(
h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
# reshape tensor into a batch of vectors
h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64])

# Densely connected layer with 1024 neurons.
h_fc1 = layers.dropout(
layers.fully_connected(
h_pool2_flat, 1024, activation_fn=tf.nn.relu),
keep_prob=0.5,
is_training=mode == tf.contrib.learn.ModeKeys.TRAIN)

# Compute logits (1 per class) and compute loss.
logits = layers.fully_connected(h_fc1, 10, activation_fn=None)
loss = tf.losses.softmax_cross_entropy(target, logits)

return tf.argmax(logits, 1), loss


def main(_):
args, unknown = _parse_args()

# Horovod: initialize Horovod.
hvd.init()

# Download and load MNIST dataset.
mnist = learn.datasets.mnist.read_data_sets('MNIST-data-%d' % hvd.rank())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the size of the dataset?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the training data is 164M. With the eval data and the label it's about 200M.


# Build model...
with tf.name_scope('input'):
image = tf.placeholder(tf.float32, [None, 784], name='image')
label = tf.placeholder(tf.float32, [None], name='label')
predict, loss = conv_model(image, label, tf.contrib.learn.ModeKeys.TRAIN)

# Horovod: adjust learning rate based on number of GPUs.
opt = tf.train.RMSPropOptimizer(0.001 * hvd.size())

# Horovod: add Horovod Distributed Optimizer.
opt = hvd.DistributedOptimizer(opt)

global_step = tf.contrib.framework.get_or_create_global_step()
train_op = opt.minimize(loss, global_step=global_step)

hooks = [
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states
# from rank 0 to all other processes. This is necessary to ensure consistent
# initialization of all workers when training is started with random weights
# or restored from a checkpoint.
hvd.BroadcastGlobalVariablesHook(0),

tf.train.StopAtStepHook(last_step=200 // hvd.size()),

tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss},
every_n_iter=10),
]

# Horovod: pin GPU to be used to process local rank (one GPU per process)
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())

# Horovod: save checkpoints only on worker 0 to prevent other workers from
# corrupting them.
checkpoint_dir = os.path.join(args.model_dir, 'checkpoints') if hvd.rank() == 0 else None

# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir,
hooks=hooks,
config=config) as mon_sess:
while not mon_sess.should_stop():
# Run a training step synchronously.
image_, label_ = mnist.train.next_batch(100)
mon_sess.run(train_op, feed_dict={image: image_, label: label_})


if __name__ == "__main__":
tf.app.run()
23 changes: 23 additions & 0 deletions tests/integ/test_tf_script_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
RESOURCE_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'tensorflow_mnist')
SCRIPT = os.path.join(RESOURCE_PATH, 'mnist.py')
DISTRIBUTION_ENABLED = {'parameter_server': {'enabled': True}}
DISTRIBUTION_MPI_ENABLED = {'mpi': {'enabled': True}}


@pytest.fixture(scope='session', params=['ml.c5.xlarge', 'ml.p2.xlarge'])
Expand Down Expand Up @@ -74,6 +75,28 @@ def test_mnist_distributed(sagemaker_session, instance_type):
['graph.pbtxt', 'model.ckpt-0.index', 'model.ckpt-0.meta', 'saved_model.pb'])


@pytest.mark.skipif(integ.PYTHON_VERSION != 'py3', reason="Script Mode tests are only configured to run with Python 3")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single quotes for strings

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

single quotes for the reason string

def test_mnist_horovod_distributed(sagemaker_session, instance_type):
estimator = TensorFlow(entry_point=os.path.join(RESOURCE_PATH, 'horovod_mnist.py'),
role='SageMakerRole',
train_instance_count=2,
train_instance_type=instance_type,
sagemaker_session=sagemaker_session,
py_version=integ.PYTHON_VERSION,
script_mode=True,
framework_version='1.11',
distributions=DISTRIBUTION_MPI_ENABLED,
base_job_name='test-tf-sm-horovod-mnist')
inputs = estimator.sagemaker_session.upload_data(
path=os.path.join(RESOURCE_PATH, 'data'),
key_prefix='scriptmode/distributed_mnist')

with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
estimator.fit(inputs)

# TODO: Add assertion of model.tar.gz contains the checkpoint files.


def _assert_s3_files_exist(s3_url, files):
parsed_url = urlparse(s3_url)
s3 = boto3.client('s3')
Expand Down
35 changes: 31 additions & 4 deletions tests/unit/test_tf_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
IMAGE_URI_FORMAT_STRING = "520713654638.dkr.ecr.{}.amazonaws.com/{}:{}-{}-{}"
SCRIPT_MODE_REPO_NAME = 'sagemaker-tensorflow-scriptmode'
DISTRIBUTION_ENABLED = {'parameter_server': {'enabled': True}}
DISTRIBUTION_MPI_ENABLED = {'mpi': {'enabled': True}}


@pytest.fixture()
Expand All @@ -72,7 +73,7 @@ def _get_full_cpu_image_uri_with_ei(version):
return _get_full_cpu_image_uri(version, repo='{}-eia'.format(IMAGE_REPO_NAME))


def _hyperparameters(script_mode=False):
def _hyperparameters(script_mode=False, horovod=False):
job_name = SM_JOB_NAME if script_mode else JOB_NAME
hps = {
'sagemaker_program': json.dumps('dummy_script.py'),
Expand All @@ -84,7 +85,10 @@ def _hyperparameters(script_mode=False):
'sagemaker_region': json.dumps('us-west-2')
}
if script_mode:
hps['model_dir'] = json.dumps('s3://{}/{}/model'.format(BUCKET_NAME, job_name))
if horovod:
hps['model_dir'] = json.dumps('/opt/ml/model')
else:
hps['model_dir'] = json.dumps('s3://{}/{}/model'.format(BUCKET_NAME, job_name))
else:
hps['checkpoint_path'] = json.dumps('s3://{}/{}/checkpoints'.format(BUCKET_NAME, job_name))
hps['training_steps'] = '1000'
Expand All @@ -93,7 +97,7 @@ def _hyperparameters(script_mode=False):
return hps


def _create_train_job(tf_version, script_mode=False, repo_name=IMAGE_REPO_NAME, py_version='py2'):
def _create_train_job(tf_version, script_mode=False, horovod=False, repo_name=IMAGE_REPO_NAME, py_version='py2'):
return {
'image': _get_full_cpu_image_uri(tf_version, repo=repo_name, py_version=py_version),
'input_mode': 'File',
Expand All @@ -118,7 +122,7 @@ def _create_train_job(tf_version, script_mode=False, repo_name=IMAGE_REPO_NAME,
'InstanceCount': 1,
'VolumeSizeInGB': 30,
},
'hyperparameters': _hyperparameters(script_mode),
'hyperparameters': _hyperparameters(script_mode, horovod),
'stop_condition': {
'MaxRuntimeInSeconds': 24 * 60 * 60
},
Expand Down Expand Up @@ -790,3 +794,26 @@ def test_tf_script_mode_ps(time, strftime, sagemaker_session):

actual_train_args = sagemaker_session.method_calls[0][2]
assert actual_train_args == expected_train_args


@patch('time.strftime', return_value=TIMESTAMP)
@patch('time.time', return_value=TIME)
@patch('sagemaker.utils.create_tar_file', MagicMock())
def test_tf_script_mode_mpi(time, strftime, sagemaker_session):
tf = TensorFlow(entry_point=SCRIPT_FILE, role=ROLE, sagemaker_session=sagemaker_session, py_version='py3',
train_instance_type=INSTANCE_TYPE, train_instance_count=1, framework_version='1.11',
source_dir=DATA_DIR, distributions=DISTRIBUTION_MPI_ENABLED)

inputs = 's3://mybucket/train'
tf.fit(inputs=inputs)

call_names = [c[0] for c in sagemaker_session.method_calls]
assert call_names == ['train', 'logs_for_job']

expected_train_args = _create_train_job('1.11', script_mode=True, horovod=True,
repo_name=SM_IMAGE_REPO_NAME, py_version='py3')
expected_train_args['input_config'][0]['DataSource']['S3DataSource']['S3Uri'] = inputs
expected_train_args['hyperparameters'][TensorFlow.LAUNCH_MPI_ENV_NAME] = json.dumps(True)

actual_train_args = sagemaker_session.method_calls[0][2]
assert actual_train_args == expected_train_args