Skip to content

Support MXNet 1.3 with its training script format changes #446

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Nov 5, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
CHANGELOG
=========

1.13.0
======

* feature: Estimators: add support for MXNet 1.3.0, including the addition of ``launch_parameter_server`` for use with the new training script format
* feature: Documentation: add explanation for the new training script format used with MXNet

1.12.0
======

Expand Down
2 changes: 1 addition & 1 deletion src/sagemaker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@
from sagemaker.session import s3_input # noqa: F401
from sagemaker.session import get_execution_role # noqa: F401

__version__ = '1.12.0'
__version__ = '1.13.0'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we bump it to 2.x since this is a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, this PR doesn't technically have breaking changes because we're not bumping the default version of MXNet. I was going to wait until the PR that makes framework_version required.

263 changes: 164 additions & 99 deletions src/sagemaker/mxnet/README.rst

Large diffs are not rendered by default.

26 changes: 21 additions & 5 deletions src/sagemaker/mxnet/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
class MXNet(Framework):
"""Handle end-to-end training and deployment of custom MXNet code."""

__framework_name__ = "mxnet"
__framework_name__ = 'mxnet'

LOWEST_SCRIPT_MODE_VERSION = ['1', '3']
LAUNCH_PS_ENV_NAME = 'sagemaker_mxnet_launch_parameter_server'

def __init__(self, entry_point, source_dir=None, hyperparameters=None, py_version='py2',
framework_version=None, image_name=None, **kwargs):
framework_version=None, image_name=None, launch_parameter_server=False, **kwargs):
"""
This ``Estimator`` executes an MXNet script in a managed MXNet execution environment, within a SageMaker
Training Job. The managed MXNet environment is an Amazon-built Docker container that executes functions
Expand Down Expand Up @@ -64,15 +67,28 @@ def __init__(self, entry_point, source_dir=None, hyperparameters=None, py_versio
Examples:
123.dkr.ecr.us-west-2.amazonaws.com/my-custom-image:1.0
custom-image:latest.
launch_parameter_server (bool): Whether or not to launch the default parameter server
implementation for use with distributed training (default: False). Valid for only
versions 1.3 and higher of MXNet.
**kwargs: Additional kwargs passed to the :class:`~sagemaker.estimator.Framework` constructor.
"""
if framework_version is None:
logger.warning(empty_framework_version_warning(MXNET_VERSION))
self.framework_version = framework_version or MXNET_VERSION

if self._script_mode_version():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still launch the parameter server with single host training?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, one can still use the kvstore needed even with only one host

hyperparameters = hyperparameters or {}
hyperparameters[self.LAUNCH_PS_ENV_NAME] = launch_parameter_server
else:
if launch_parameter_server:
raise ValueError('launch_parameter_server is used for only versions 1.3 and higher')

super(MXNet, self).__init__(entry_point, source_dir, hyperparameters,
image_name=image_name, **kwargs)
self.py_version = py_version

if framework_version is None:
logger.warning(empty_framework_version_warning(MXNET_VERSION))
self.framework_version = framework_version or MXNET_VERSION
def _script_mode_version(self):
return self.framework_version.split('.') >= self.LOWEST_SCRIPT_MODE_VERSION

def create_model(self, model_server_workers=None, role=None, vpc_config_override=VPC_CONFIG_DEFAULT):
"""Create a SageMaker ``MXNetModel`` object that can be deployed to an ``Endpoint``.
Expand Down
6 changes: 3 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from sagemaker.local import LocalSession
from sagemaker.chainer.defaults import CHAINER_VERSION
from sagemaker.pytorch.defaults import PYTORCH_VERSION
from sagemaker.mxnet.defaults import MXNET_VERSION
from sagemaker.tensorflow.defaults import TF_VERSION


Expand All @@ -34,7 +33,7 @@ def pytest_addoption(parser):
parser.addoption('--sagemaker-runtime-config', action='store', default=None)
parser.addoption('--boto-config', action='store', default=None)
parser.addoption('--tf-full-version', action='store', default=TF_VERSION)
parser.addoption('--mxnet-full-version', action='store', default=MXNET_VERSION)
parser.addoption('--mxnet-full-version', action='store', default='1.3.0')
parser.addoption('--chainer-full-version', action='store', default=CHAINER_VERSION)
parser.addoption('--pytorch-full-version', action='store', default=PYTORCH_VERSION)

Expand Down Expand Up @@ -86,7 +85,8 @@ def tf_version(request):
return request.param


@pytest.fixture(scope='module', params=['0.12', '0.12.1', '1.0', '1.0.0', '1.1', '1.1.0', '1.2', '1.2.1'])
@pytest.fixture(scope='module', params=['0.12', '0.12.1', '1.0', '1.0.0', '1.1', '1.1.0', '1.2',
'1.2.1', '1.3', '1.3.0'])
def mxnet_version(request):
return request.param

Expand Down
6 changes: 3 additions & 3 deletions tests/data/mxnet_mnist/failure_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

def train(**kwargs):
"""For use with integration tests expecting failures."""
raise Exception('This failure is expected.')

# For use with integration tests expecting failures.
raise Exception('This failure is expected.')
81 changes: 65 additions & 16 deletions tests/data/mxnet_mnist/mnist.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
Expand All @@ -12,13 +10,17 @@
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import argparse
import gzip
import json
import logging
import os
import struct

import gzip
import mxnet as mx
import numpy as np
import os
import struct

from sagemaker_mxnet_container.training_utils import scheduler_host


def load_data(path):
Expand Down Expand Up @@ -56,23 +58,70 @@ def get_train_context(num_gpus):
return mx.cpu()


def train(channel_input_dirs, hyperparameters, hosts, num_gpus, **kwargs):
(train_labels, train_images) = load_data(os.path.join(channel_input_dirs['train']))
(test_labels, test_images) = load_data(os.path.join(channel_input_dirs['test']))
batch_size = 100
train_iter = mx.io.NDArrayIter(train_images, train_labels, batch_size, shuffle=True)
def train(batch_size, epochs, learning_rate, num_gpus, training_channel, testing_channel,
hosts, current_host, model_dir):
(train_labels, train_images) = load_data(training_channel)
(test_labels, test_images) = load_data(testing_channel)

# Data parallel training - shard the data so each host
# only trains on a subset of the total data.
shard_size = len(train_images) // len(hosts)
for i, host in enumerate(hosts):
if host == current_host:
start = shard_size * i
end = start + shard_size
break

train_iter = mx.io.NDArrayIter(train_images[start:end], train_labels[start:end], batch_size,
shuffle=True)
val_iter = mx.io.NDArrayIter(test_images, test_labels, batch_size)

logging.getLogger().setLevel(logging.DEBUG)

kvstore = 'local' if len(hosts) == 1 else 'dist_sync'
mlp_model = mx.mod.Module(
symbol=build_graph(),
context=get_train_context(num_gpus))

mlp_model = mx.mod.Module(symbol=build_graph(),
context=get_train_context(num_gpus))
mlp_model.fit(train_iter,
eval_data=val_iter,
kvstore=kvstore,
optimizer='sgd',
optimizer_params={'learning_rate': float(hyperparameters.get("learning_rate", 0.1))},
optimizer_params={'learning_rate': learning_rate},
eval_metric='acc',
batch_end_callback=mx.callback.Speedometer(batch_size, 100),
num_epoch=1)
return mlp_model
num_epoch=epochs)

if len(hosts) == 1 or current_host == scheduler_host(hosts):
save(model_dir, mlp_model)


def save(model_dir, model):
model.symbol.save(os.path.join(model_dir, 'model-symbol.json'))
model.save_params(os.path.join(model_dir, 'model-0000.params'))

signature = [{'name': data_desc.name, 'shape': [dim for dim in data_desc.shape]}
for data_desc in model.data_shapes]
with open(os.path.join(model_dir, 'model-shapes.json'), 'w') as f:
json.dump(signature, f)


if __name__ == '__main__':
parser = argparse.ArgumentParser()

parser.add_argument('--batch-size', type=int, default=100)
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument('--learning-rate', type=float, default=0.1)

parser.add_argument('--model-dir', type=str, default=os.environ['SM_MODEL_DIR'])
parser.add_argument('--train', type=str, default=os.environ['SM_CHANNEL_TRAIN'])
parser.add_argument('--test', type=str, default=os.environ['SM_CHANNEL_TEST'])

parser.add_argument('--current-host', type=str, default=os.environ['SM_CURRENT_HOST'])
parser.add_argument('--hosts', type=list, default=json.loads(os.environ['SM_HOSTS']))

args = parser.parse_args()

num_gpus = int(os.environ['SM_NUM_GPUS'])

train(args.batch_size, args.epochs, args.learning_rate, num_gpus, args.train, args.test,
args.hosts, args.current_host, args.model_dir)
4 changes: 2 additions & 2 deletions tests/integ/test_mxnet_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def mxnet_training_job(sagemaker_session, mxnet_full_version):

mx = MXNet(entry_point=script_path, role='SageMakerRole', framework_version=mxnet_full_version,
py_version=PYTHON_VERSION, train_instance_count=1, train_instance_type='ml.c4.xlarge',
sagemaker_session=sagemaker_session)
launch_parameter_server=True, sagemaker_session=sagemaker_session)

train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'),
key_prefix='integ-test-data/mxnet_mnist/train')
Expand Down Expand Up @@ -80,7 +80,7 @@ def test_async_fit(sagemaker_session):

mx = MXNet(entry_point=script_path, role='SageMakerRole', py_version=PYTHON_VERSION,
train_instance_count=1, train_instance_type='ml.c4.xlarge',
sagemaker_session=sagemaker_session)
launch_parameter_server=True, sagemaker_session=sagemaker_session)

train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'),
key_prefix='integ-test-data/mxnet_mnist/train')
Expand Down
1 change: 1 addition & 0 deletions tests/integ/test_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def test_tuning_mxnet(sagemaker_session):
py_version=PYTHON_VERSION,
train_instance_count=1,
train_instance_type='ml.m4.xlarge',
framework_version='1.2.1',
sagemaker_session=sagemaker_session,
base_job_name='tune-mxnet')

Expand Down
25 changes: 25 additions & 0 deletions tests/unit/test_mxnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ def test_mxnet(strftime, sagemaker_session, mxnet_version):
expected_train_args = _create_train_job(mxnet_version)
expected_train_args['input_config'][0]['DataSource']['S3DataSource']['S3Uri'] = inputs

if mx._script_mode_version():
expected_train_args['hyperparameters'][mx.LAUNCH_PS_ENV_NAME] = json.dumps(False)

actual_train_args = sagemaker_session.method_calls[0][2]
assert actual_train_args == expected_train_args

Expand Down Expand Up @@ -372,3 +375,25 @@ def test_attach_custom_image(sagemaker_session):
estimator = MXNet.attach(training_job_name='neo', sagemaker_session=sagemaker_session)
assert estimator.image_name == training_image
assert estimator.train_image() == training_image


def test_estimator_script_mode_launch_parameter_server(sagemaker_session):
mx = MXNet(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session,
train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE,
launch_parameter_server=True, framework_version='1.3.0')
assert mx.hyperparameters().get(MXNet.LAUNCH_PS_ENV_NAME) == 'true'


def test_estimator_script_mode_dont_launch_parameter_server(sagemaker_session):
mx = MXNet(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session,
train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE,
launch_parameter_server=False, framework_version='1.3.0')
assert mx.hyperparameters().get(MXNet.LAUNCH_PS_ENV_NAME) == 'false'


def test_estimator_wrong_version_launch_parameter_server(sagemaker_session):
with pytest.raises(ValueError) as e:
MXNet(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session,
train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE,
launch_parameter_server=True, framework_version='1.2.1')
assert 'launch_parameter_server is used for only versions 1.3 and higher' in str(e)