Skip to content

Fix model_dir adjustment for hyperparameter tuning jobs #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 20 additions & 12 deletions src/sagemaker_tensorflow_container/training.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2018-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'). You
# may not use this file except in compliance with the License. A copy of
Expand All @@ -10,7 +10,6 @@
# distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

from __future__ import absolute_import

import json
Expand Down Expand Up @@ -106,11 +105,11 @@ def _run_ps(env, cluster):
threading.Thread(target=lambda: server.join()).start()


def _run_worker(env, tf_config):
def _run_worker(env, cmd_args, tf_config):
env_vars = env.to_env_vars()
env_vars['TF_CONFIG'] = json.dumps(tf_config)

framework.entry_point.run(env.module_dir, env.user_entry_point, env.to_cmd_args(), env_vars)
framework.entry_point.run(env.module_dir, env.user_entry_point, cmd_args, env_vars)


def _wait_until_master_is_down(master):
Expand All @@ -125,7 +124,7 @@ def _wait_until_master_is_down(master):
return


def train(env):
def train(env, cmd_args):
"""Get training job environment from env and run the training job.

Args:
Expand All @@ -141,7 +140,7 @@ def train(env):
logger.info('Launching parameter server process')
_run_ps(env, tf_config['cluster'])
logger.info('Launching worker process')
_run_worker(env, tf_config)
_run_worker(env, cmd_args, tf_config)

if not _is_host_master(env.hosts, env.current_host):
_wait_until_master_is_down(env.hosts[0])
Expand All @@ -155,8 +154,7 @@ def train(env):
else:
runner_type = framework.runner.ProcessRunnerType

framework.entry_point.run(env.module_dir, env.user_entry_point,
env.to_cmd_args(), env.to_env_vars(),
framework.entry_point.run(env.module_dir, env.user_entry_point, cmd_args, env.to_env_vars(),
runner=runner_type)


Expand Down Expand Up @@ -187,18 +185,28 @@ def _log_model_missing_warning(model_dir):
'https://www.tensorflow.org/guide/saved_model#structure_of_a_savedmodel_directory')


def _model_dir_with_training_job(model_dir, job_name):
if model_dir.startswith('/opt/ml'):
return model_dir
else:
return '{}/{}/model'.format(model_dir, job_name)


def main():
"""Training entry point
"""
hyperparameters = framework.env.read_hyperparameters()
env = framework.training_env(hyperparameters=hyperparameters)

user_hyperparameters = env.hyperparameters

# If the training job is part of the multiple training jobs for tuning, we need to append the training job name to
# model_dir in case they read from/write to the same object
if '_tuning_objective_metric' in hyperparameters:
env.hyperparameters['model_dir'] = os.path.join(hyperparameters.get('model_dir'), env.job_name, 'checkpoints')
model_dir = _model_dir_with_training_job(hyperparameters.get('model_dir'), env.job_name)
logger.info('Appending the training job name to model_dir: {}'.format(model_dir))
user_hyperparameters['model_dir'] = model_dir

s3_utils.configure(env.hyperparameters.get('model_dir'), os.environ.get('SAGEMAKER_REGION'))
logger.setLevel(env.log_level)
train(env)
s3_utils.configure(user_hyperparameters.get('model_dir'), os.environ.get('SAGEMAKER_REGION'))
train(env, framework.mapping.to_cmd_args(user_hyperparameters))
_log_model_missing_warning(MODEL_DIR)
50 changes: 31 additions & 19 deletions test/unit/test_training.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Copyright 2017-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
Expand Down Expand Up @@ -40,6 +40,7 @@
PS_TASK_1 = {'index': 0, 'type': 'ps'}
PS_TASK_2 = {'index': 1, 'type': 'ps'}
MODEL_DIR = 's3://bucket/prefix'
MODEL_DIR_CMD_LIST = ['--model_dir', MODEL_DIR]
REGION = 'us-west-2'
RESOURCE_PATH = os.path.join(os.path.dirname(__file__), '..', 'resources')

Expand Down Expand Up @@ -82,9 +83,8 @@ def test_is_host_master():

@patch('sagemaker_containers.beta.framework.entry_point.run')
def test_single_machine(run_module, single_machine_training_env):
training.train(single_machine_training_env)
run_module.assert_called_with(MODULE_DIR, MODULE_NAME,
single_machine_training_env.to_cmd_args(),
training.train(single_machine_training_env, MODEL_DIR_CMD_LIST)
run_module.assert_called_with(MODULE_DIR, MODULE_NAME, MODEL_DIR_CMD_LIST,
single_machine_training_env.to_env_vars(),
runner=runner.ProcessRunnerType)

Expand All @@ -93,9 +93,8 @@ def test_single_machine(run_module, single_machine_training_env):
def test_train_horovod(run_module, single_machine_training_env):
single_machine_training_env.additional_framework_parameters['sagemaker_mpi_enabled'] = True

training.train(single_machine_training_env)
run_module.assert_called_with(MODULE_DIR, MODULE_NAME,
single_machine_training_env.to_cmd_args(),
training.train(single_machine_training_env, MODEL_DIR_CMD_LIST)
run_module.assert_called_with(MODULE_DIR, MODULE_NAME, MODEL_DIR_CMD_LIST,
single_machine_training_env.to_env_vars(),
runner=runner.MPIRunnerType)

Expand All @@ -108,7 +107,7 @@ def test_train_horovod(run_module, single_machine_training_env):
@patch('threading.Thread', lambda target: target())
@patch('time.sleep', MagicMock())
def test_train_distributed_master(run, tf_server, cluster_spec, distributed_training_env):
training.train(distributed_training_env)
training.train(distributed_training_env, MODEL_DIR_CMD_LIST)

cluster_spec.assert_called_with({'worker': ['host2:2222'],
'master': ['host1:2222'],
Expand All @@ -126,8 +125,7 @@ def test_train_distributed_master(run, tf_server, cluster_spec, distributed_trai
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "master"}}'

run.assert_called_with('s3://my/bucket', 'script_name',
distributed_training_env.to_cmd_args(),
run.assert_called_with('s3://my/bucket', 'script_name', MODEL_DIR_CMD_LIST,
{'TF_CONFIG': tf_config})


Expand All @@ -140,7 +138,7 @@ def test_train_distributed_master(run, tf_server, cluster_spec, distributed_trai
def test_train_distributed_worker(run, tf_server, cluster_spec, distributed_training_env):
distributed_training_env.current_host = HOST2

training.train(distributed_training_env)
training.train(distributed_training_env, MODEL_DIR_CMD_LIST)

cluster_spec.assert_called_with({'worker': ['host2:2222'],
'master': ['host1:2222'],
Expand All @@ -158,8 +156,7 @@ def test_train_distributed_worker(run, tf_server, cluster_spec, distributed_trai
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "worker"}}'

run.assert_called_with('s3://my/bucket', 'script_name',
distributed_training_env.to_cmd_args(),
run.assert_called_with('s3://my/bucket', 'script_name', MODEL_DIR_CMD_LIST,
{'TF_CONFIG': tf_config})


Expand All @@ -168,9 +165,9 @@ def test_train_distributed_no_ps(run, distributed_training_env):
distributed_training_env.additional_framework_parameters[
training.SAGEMAKER_PARAMETER_SERVER_ENABLED] = False
distributed_training_env.current_host = HOST2
training.train(distributed_training_env)
training.train(distributed_training_env, MODEL_DIR_CMD_LIST)

run.assert_called_with(MODULE_DIR, MODULE_NAME, distributed_training_env.to_cmd_args(),
run.assert_called_with(MODULE_DIR, MODULE_NAME, MODEL_DIR_CMD_LIST,
distributed_training_env.to_env_vars(), runner=runner.ProcessRunnerType)


Expand Down Expand Up @@ -251,7 +248,7 @@ def test_main(configure_s3_env, read_hyperparameters, training_env,
training.main()
read_hyperparameters.assert_called_once_with()
training_env.assert_called_once_with(hyperparameters={})
train.assert_called_once_with(single_machine_training_env)
train.assert_called_once_with(single_machine_training_env, MODEL_DIR_CMD_LIST)
configure_s3_env.assert_called_once()


Expand All @@ -276,10 +273,25 @@ def test_main_simple_training_model_dir(configure_s3_env, read_hyperparameters,
@patch('sagemaker_containers.beta.framework.env.read_hyperparameters', return_value={'model_dir': MODEL_DIR,
'_tuning_objective_metric': 'auc'})
@patch('sagemaker_tensorflow_container.s3_utils.configure')
def test_main_tunning_model_dir(configure_s3_env, read_hyperparameters, training_env,
set_level, train, logger, single_machine_training_env):
def test_main_tuning_model_dir(configure_s3_env, read_hyperparameters, training_env,
set_level, train, logger, single_machine_training_env):
training_env.return_value = single_machine_training_env
os.environ['SAGEMAKER_REGION'] = REGION
training.main()
expected_model_dir = os.path.join(MODEL_DIR, single_machine_training_env.job_name, 'checkpoints')
expected_model_dir = '{}/{}/model'.format(MODEL_DIR, single_machine_training_env.job_name)
configure_s3_env.assert_called_once_with(expected_model_dir, REGION)


@patch('sagemaker_tensorflow_container.training.logger')
@patch('sagemaker_tensorflow_container.training.train')
@patch('logging.Logger.setLevel')
@patch('sagemaker_containers.beta.framework.training_env')
@patch('sagemaker_containers.beta.framework.env.read_hyperparameters', return_value={'model_dir': '/opt/ml/model',
'_tuning_objective_metric': 'auc'})
@patch('sagemaker_tensorflow_container.s3_utils.configure')
def test_main_tuning_mpi_model_dir(configure_s3_env, read_hyperparameters, training_env,
set_level, train, logger, single_machine_training_env):
training_env.return_value = single_machine_training_env
os.environ['SAGEMAKER_REGION'] = REGION
training.main()
configure_s3_env.assert_called_once_with('/opt/ml/model', REGION)