-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[DO NOT MERGE] Enable distributed training with Horovod for TensorFlow Script Mode #529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
1eb85ad
c63fe06
b91208a
a1b426a
10fe7bf
c5f68b1
858079e
ffc0812
7587e52
f1f8583
64449a5
c857afe
245b75f
e3aeb6e
2bcd290
3cfdc57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -199,15 +199,22 @@ def __init__(self, training_steps=None, evaluation_steps=None, checkpoint_path=N | |
custom-image:latest. | ||
script_mode (bool): If set to True will the estimator will use the Script Mode containers (default: False). | ||
This will be ignored if py_version is set to 'py3'. | ||
distribution (dict): A dictionary with information on how to run distributed training | ||
(default: None). Currently we only support distributed training with parameter servers. To enable it | ||
use the following setup: | ||
distributions (dict): A dictionary with information on how to run distributed training | ||
(default: None). Currently we support distributed training with parameter servers and MPI. To enable | ||
parameter server use the following setup: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/server/servers There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for "To enable parameter server" - s/server/servers |
||
{ | ||
'parameter_server': | ||
{ | ||
'enabled': True | ||
} | ||
} | ||
To enable MPI: | ||
{ | ||
'mpi': | ||
{ | ||
'enabled': True | ||
} | ||
} | ||
**kwargs: Additional kwargs passed to the Framework constructor. | ||
""" | ||
if framework_version is None: | ||
|
@@ -421,11 +428,19 @@ def hyperparameters(self): | |
self.checkpoint_path = self.checkpoint_path or self._default_s3_path('checkpoints') | ||
|
||
if self._script_mode_enabled(): | ||
self.model_dir = self.model_dir or self._default_s3_path('model') | ||
additional_hyperparameters = {'model_dir': self.model_dir} | ||
additional_hyperparameters = {} | ||
|
||
if 'parameter_server' in self.distributions: | ||
enabled = self.distributions['parameter_server'].get('enabled', False) | ||
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = enabled | ||
ps_enabled = self.distributions['parameter_server'].get('enabled', False) | ||
additional_hyperparameters[self.LAUNCH_PS_ENV_NAME] = ps_enabled | ||
|
||
mpi_enabled = False | ||
if 'mpi' in self.distributions: | ||
mpi_enabled = self.distributions['mpi'].get('enabled', False) | ||
additional_hyperparameters[self.LAUNCH_MPI_ENV_NAME] = mpi_enabled | ||
|
||
self.model_dir = self.model_dir or self._default_s3_path('model', mpi=mpi_enabled) | ||
additional_hyperparameters['model_dir'] = self.model_dir | ||
else: | ||
additional_hyperparameters = {'checkpoint_path': self.checkpoint_path, | ||
'training_steps': self.training_steps, | ||
|
@@ -435,10 +450,12 @@ def hyperparameters(self): | |
hyperparameters.update(Framework._json_encode_hyperparameters(additional_hyperparameters)) | ||
return hyperparameters | ||
|
||
def _default_s3_path(self, directory): | ||
def _default_s3_path(self, directory, mpi=False): | ||
local_code = get_config_value('local.local_code', self.sagemaker_session.config) | ||
if self.sagemaker_session.local_mode and local_code: | ||
return '/opt/ml/shared/{}'.format(directory) | ||
elif mpi: | ||
return '/opt/ml/model' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we make this a constant? |
||
else: | ||
return os.path.join(self.output_path, self._current_job_name, directory) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"). You | ||
# may not use this file except in compliance with the License. A copy of | ||
# the License is located at | ||
# | ||
# http://aws.amazon.com/apache2.0/ | ||
# | ||
# or in the "license" file accompanying this file. This file is | ||
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | ||
# ANY KIND, either express or implied. See the License for the specific | ||
# language governing permissions and limitations under the License. | ||
from __future__ import absolute_import | ||
|
||
import argparse | ||
import os | ||
|
||
import tensorflow as tf | ||
import horovod.tensorflow as hvd | ||
|
||
layers = tf.contrib.layers | ||
learn = tf.contrib.learn | ||
|
||
tf.logging.set_verbosity(tf.logging.INFO) | ||
|
||
|
||
def _parse_args(): | ||
parser = argparse.ArgumentParser() | ||
# Data, model, and output directories | ||
parser.add_argument('--output-data-dir', type=str, default=os.environ.get('SM_OUTPUT_DATA_DIR')) | ||
parser.add_argument('--model_dir', type=str) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it's strange to me that we would mix underscores and hyphens in our examples like this |
||
|
||
return parser.parse_known_args() | ||
|
||
|
||
def conv_model(feature, target, mode): | ||
"""2-layer convolution model.""" | ||
# Convert the target to a one-hot tensor of shape (batch_size, 10) and | ||
# with a on-value of 1 for each one-hot vector of length 10. | ||
target = tf.one_hot(tf.cast(target, tf.int32), 10, 1, 0) | ||
|
||
# Reshape feature to 4d tensor with 2nd and 3rd dimensions being | ||
# image width and height final dimension being the number of color channels. | ||
feature = tf.reshape(feature, [-1, 28, 28, 1]) | ||
|
||
# First conv layer will compute 32 features for each 5x5 patch | ||
with tf.variable_scope('conv_layer1'): | ||
h_conv1 = layers.conv2d( | ||
feature, 32, kernel_size=[5, 5], activation_fn=tf.nn.relu) | ||
h_pool1 = tf.nn.max_pool( | ||
h_conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') | ||
|
||
# Second conv layer will compute 64 features for each 5x5 patch. | ||
with tf.variable_scope('conv_layer2'): | ||
h_conv2 = layers.conv2d( | ||
h_pool1, 64, kernel_size=[5, 5], activation_fn=tf.nn.relu) | ||
h_pool2 = tf.nn.max_pool( | ||
h_conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME') | ||
# reshape tensor into a batch of vectors | ||
h_pool2_flat = tf.reshape(h_pool2, [-1, 7 * 7 * 64]) | ||
|
||
# Densely connected layer with 1024 neurons. | ||
h_fc1 = layers.dropout( | ||
layers.fully_connected( | ||
h_pool2_flat, 1024, activation_fn=tf.nn.relu), | ||
keep_prob=0.5, | ||
is_training=mode == tf.contrib.learn.ModeKeys.TRAIN) | ||
|
||
# Compute logits (1 per class) and compute loss. | ||
logits = layers.fully_connected(h_fc1, 10, activation_fn=None) | ||
loss = tf.losses.softmax_cross_entropy(target, logits) | ||
|
||
return tf.argmax(logits, 1), loss | ||
|
||
|
||
def main(_): | ||
args, unknown = _parse_args() | ||
|
||
# Horovod: initialize Horovod. | ||
hvd.init() | ||
|
||
# Download and load MNIST dataset. | ||
mnist = learn.datasets.mnist.read_data_sets('MNIST-data-%d' % hvd.rank()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the size of the dataset? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the training data is 164M. With the eval data and the label it's about 200M. |
||
|
||
# Build model... | ||
with tf.name_scope('input'): | ||
image = tf.placeholder(tf.float32, [None, 784], name='image') | ||
label = tf.placeholder(tf.float32, [None], name='label') | ||
predict, loss = conv_model(image, label, tf.contrib.learn.ModeKeys.TRAIN) | ||
|
||
# Horovod: adjust learning rate based on number of GPUs. | ||
opt = tf.train.RMSPropOptimizer(0.001 * hvd.size()) | ||
|
||
# Horovod: add Horovod Distributed Optimizer. | ||
opt = hvd.DistributedOptimizer(opt) | ||
|
||
global_step = tf.contrib.framework.get_or_create_global_step() | ||
train_op = opt.minimize(loss, global_step=global_step) | ||
|
||
hooks = [ | ||
# Horovod: BroadcastGlobalVariablesHook broadcasts initial variable states | ||
# from rank 0 to all other processes. This is necessary to ensure consistent | ||
# initialization of all workers when training is started with random weights | ||
# or restored from a checkpoint. | ||
hvd.BroadcastGlobalVariablesHook(0), | ||
|
||
tf.train.StopAtStepHook(last_step=200 // hvd.size()), | ||
|
||
tf.train.LoggingTensorHook(tensors={'step': global_step, 'loss': loss}, | ||
every_n_iter=10), | ||
] | ||
|
||
# Horovod: pin GPU to be used to process local rank (one GPU per process) | ||
config = tf.ConfigProto() | ||
config.gpu_options.allow_growth = True | ||
config.gpu_options.visible_device_list = str(hvd.local_rank()) | ||
|
||
# Horovod: save checkpoints only on worker 0 to prevent other workers from | ||
# corrupting them. | ||
checkpoint_dir = os.path.join(args.model_dir, 'checkpoints') if hvd.rank() == 0 else None | ||
|
||
# The MonitoredTrainingSession takes care of session initialization, | ||
# restoring from a checkpoint, saving to a checkpoint, and closing when done | ||
# or an error occurs. | ||
with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, | ||
hooks=hooks, | ||
config=config) as mon_sess: | ||
while not mon_sess.should_stop(): | ||
# Run a training step synchronously. | ||
image_, label_ = mnist.train.next_batch(100) | ||
mon_sess.run(train_op, feed_dict={image: image_, label: label_}) | ||
|
||
|
||
if __name__ == "__main__": | ||
tf.app.run() |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,7 @@ | |
RESOURCE_PATH = os.path.join(os.path.dirname(__file__), '..', 'data', 'tensorflow_mnist') | ||
SCRIPT = os.path.join(RESOURCE_PATH, 'mnist.py') | ||
DISTRIBUTION_ENABLED = {'parameter_server': {'enabled': True}} | ||
DISTRIBUTION_MPI_ENABLED = {'mpi': {'enabled': True}} | ||
|
||
|
||
@pytest.fixture(scope='session', params=['ml.c5.xlarge', 'ml.p2.xlarge']) | ||
|
@@ -74,6 +75,28 @@ def test_mnist_distributed(sagemaker_session, instance_type): | |
['graph.pbtxt', 'model.ckpt-0.index', 'model.ckpt-0.meta', 'saved_model.pb']) | ||
|
||
|
||
@pytest.mark.skipif(integ.PYTHON_VERSION != 'py3', reason="Script Mode tests are only configured to run with Python 3") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. single quotes for strings There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. single quotes for the reason string |
||
def test_mnist_horovod_distributed(sagemaker_session, instance_type): | ||
estimator = TensorFlow(entry_point=os.path.join(RESOURCE_PATH, 'horovod_mnist.py'), | ||
role='SageMakerRole', | ||
train_instance_count=2, | ||
train_instance_type=instance_type, | ||
sagemaker_session=sagemaker_session, | ||
py_version=integ.PYTHON_VERSION, | ||
script_mode=True, | ||
framework_version='1.11', | ||
distributions=DISTRIBUTION_MPI_ENABLED, | ||
base_job_name='test-tf-sm-horovod-mnist') | ||
inputs = estimator.sagemaker_session.upload_data( | ||
path=os.path.join(RESOURCE_PATH, 'data'), | ||
key_prefix='scriptmode/distributed_mnist') | ||
|
||
with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES): | ||
estimator.fit(inputs) | ||
|
||
# TODO: Add assertion of model.tar.gz contains the checkpoint files. | ||
|
||
|
||
def _assert_s3_files_exist(s3_url, files): | ||
parsed_url = urlparse(s3_url) | ||
s3 = boto3.client('s3') | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We support more than these two distributed training types. I guess the difference is that these two types require additional setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's possible for user to run other types of distributed training. But I wouldn't say those are supported. These two types are setup by our code and we are going to support and maintain that code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to provide details about the 2 configurations options
custom_mpi_options
andprocesses_per_host
, here is the draft that I wrote:Please see
distribution
andTraining with Horovod
sections of https://github.com/uditbhatia/sagemaker-python-sdk/blob/horovod-documentation/src/sagemaker/tensorflow/README.rstPLease note couple of links are broken as it is still a draft. But I hope this helps you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
re: Marcio's initial concern - I think this would read better if the "supporting two types of distributed training" were attributed to
distributions
rather than "us" (aka SageMaker). So maybe change this to: