Skip to content

Create parameter server in different thread #127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions src/sagemaker_tensorflow_container/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@
import logging
import os
import subprocess
import threading
import time

import sagemaker_containers.beta.framework as framework
import tensorflow as tf

import sagemaker_tensorflow_container.s3_utils as s3_utils

from sagemaker_tensorflow_container import s3_utils

logger = logging.getLogger(__name__)


SAGEMAKER_PARAMETER_SERVER_ENABLED = 'sagemaker_parameter_server_enabled'


Expand Down Expand Up @@ -90,27 +90,27 @@ def host_addresses(hosts, port=2222):

def _env_vars_with_tf_config(env, ps_task):
env_vars = env.to_env_vars()
env_vars['TF_CONFIG'] = json.dumps(_build_tf_config(
hosts=env.hosts,
current_host=env.current_host,
ps_task=ps_task))
tf_config = _build_tf_config(hosts=env.hosts,
current_host=env.current_host,
ps_task=ps_task)

env_vars['TF_CONFIG'] = json.dumps(tf_config, sort_keys=True)
return env_vars


def _run_ps(env):
env_vars = _env_vars_with_tf_config(env, ps_task=True)
# Parameter server processes should always run on CPU. Sets CUDA_VISIBLE_DEVICES to '-1' forces
# TensorFlow to use CPU.
env_vars['CUDA_VISIBLE_DEVICES'] = json.dumps(-1)
framework.entry_point.run(env.module_dir, env.user_entry_point,
env.to_cmd_args(), env_vars, wait=False)
logger.info('Running distributed training job with parameter servers')

tf_config = _build_tf_config(hosts=env.hosts, current_host=env.current_host)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just remove the ps_task parameter in _build_tf_config? It's not used anymore. Not critical. if you don't want to fiddle with the unit tests we can do it later.

cluster_spec = tf.train.ClusterSpec(tf_config['cluster'])
task_index = env.hosts.index(env.current_host)

server = tf.train.Server(cluster_spec, job_name='ps', task_index=task_index)

threading.Thread(target=lambda: server.join()).start()


def _run_worker(env):
# when _run_ps is called CUDA_VISIBLE_DEVICES is set with os.environ.
# We need to unset it so the worker process can use the GPUs.
if os.environ.get('CUDA_VISIBLE_DEVICES'):
del os.environ['CUDA_VISIBLE_DEVICES']
env_vars = _env_vars_with_tf_config(env, ps_task=False)
framework.entry_point.run(env.module_dir, env.user_entry_point, env.to_cmd_args(), env_vars)

Expand Down
127 changes: 51 additions & 76 deletions test/unit/test_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

import json
import os
import subprocess
import sys

from mock import MagicMock, patch
import pytest
Expand Down Expand Up @@ -45,27 +43,30 @@

@pytest.fixture
def distributed_training_env():
return MagicMock(module_dir=MODULE_DIR,
user_entry_point=MODULE_NAME,
hyperparameters={},
log_level=LOG_LEVEL,
hosts=HOST_LIST,
current_host=CURRENT_HOST,
to_env_vars=lambda: {},
additional_framework_parameters={
training.SAGEMAKER_PARAMETER_SERVER_ENABLED: True
})
env = simple_training_env()

env.hosts = HOST_LIST
env.additional_framework_parameters = {
training.SAGEMAKER_PARAMETER_SERVER_ENABLED: True
}
return env


@pytest.fixture
def single_machine_training_env():
env = MagicMock()
return simple_training_env()


def simple_training_env():
env = MagicMock()
env.module_dir = MODULE_DIR
env.user_entry_point = MODULE_NAME
env.hyperparameters = {'model_dir': MODEL_DIR}
env.log_level = LOG_LEVEL

env.additional_framework_parameters = {}
env.hosts = CURRENT_HOST
env.current_host = CURRENT_HOST
env.to_env_vars = lambda: {}
return env


Expand All @@ -83,73 +84,59 @@ def test_single_machine(run_module, single_machine_training_env):
single_machine_training_env.to_env_vars())


@pytest.mark.skipif(sys.version_info.major != 3,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This tests are flaky on py2 because of the dict key ordering. I think we should add the skipif back.

reason="Skip this for python 2 because of dict key order mismatch")
@patch('tensorflow.train.ClusterSpec')
@patch('tensorflow.train.Server')
@patch('sagemaker_containers.beta.framework.entry_point.run')
@patch('threading.Thread', lambda target: target())
@patch('time.sleep', MagicMock())
def test_train_distributed_master(run, distributed_training_env):
def test_train_distributed_master(run, tf_server, cluster_spec, distributed_training_env):
training.train(distributed_training_env)

ps_tf_config = '{"cluster": {' \
'"master": ["host1:2222"], ' \
'"ps": ["host1:2223", "host2:2223"], ' \
'"worker": ["host2:2222"]}, ' \
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "ps"}}'

run.assert_any_call('s3://my/bucket', 'script_name',
distributed_training_env.to_cmd_args(),
{'TF_CONFIG': ps_tf_config, 'CUDA_VISIBLE_DEVICES': '-1'},
wait=False)

master_tf_config = '{"cluster": {' \
'"master": ["host1:2222"], ' \
'"ps": ["host1:2223", "host2:2223"], ' \
'"worker": ["host2:2222"]}, ' \
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "master"}}'
cluster_spec.assert_called_with({'worker': ['host2:2222'],
'master': ['host1:2222'],
'ps': ['host1:2223', 'host2:2223']})

tf_server.assert_called_with(cluster_spec(), job_name='ps', task_index=0)
tf_server().join.assert_called_with()

tf_config = '{"cluster": {' \
'"master": ["host1:2222"], ' \
'"ps": ["host1:2223", "host2:2223"], ' \
'"worker": ["host2:2222"]}, ' \
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "master"}}'

run.assert_called_with('s3://my/bucket', 'script_name',
distributed_training_env.to_cmd_args(),
{'TF_CONFIG': master_tf_config})
{'TF_CONFIG': tf_config})


@pytest.mark.skipif(sys.version_info.major != 3,
reason="Skip this for python 2 because of dict key order mismatch")
@patch('subprocess.check_call')
@patch('time.sleep', MagicMock())
@patch('tensorflow.train.ClusterSpec')
@patch('tensorflow.train.Server')
@patch('sagemaker_containers.beta.framework.entry_point.run')
def test_train_distributed_worker(run,
check_call,
distributed_training_env):
@patch('time.sleep', MagicMock())
def test_train_distributed_worker(run, tf_server, cluster_spec, distributed_training_env):
distributed_training_env.current_host = HOST2
check_call.side_effect = subprocess.CalledProcessError(returncode=1, cmd=[])

training.train(distributed_training_env)

ps_tf_config = '{"cluster": {' \
'"master": ["host1:2222"], ' \
'"ps": ["host1:2223", "host2:2223"], ' \
'"worker": ["host2:2222"]}, ' \
'"environment": "cloud", ' \
'"task": {"index": 1, "type": "ps"}}'

run.assert_any_call('s3://my/bucket', 'script_name',
distributed_training_env.to_cmd_args(),
{'TF_CONFIG': ps_tf_config, 'CUDA_VISIBLE_DEVICES': '-1'},
wait=False)

master_tf_config = '{"cluster": {' \
'"master": ["host1:2222"], ' \
'"ps": ["host1:2223", "host2:2223"], ' \
'"worker": ["host2:2222"]}, ' \
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "worker"}}'
cluster_spec.assert_called_with({'worker': ['host2:2222'],
'master': ['host1:2222'],
'ps': ['host1:2223', 'host2:2223']})

tf_server.assert_called_with(cluster_spec(), job_name='ps', task_index=1)
tf_server().join.assert_called_with()

tf_config = '{"cluster": {' \
'"master": ["host1:2222"], ' \
'"ps": ["host1:2223", "host2:2223"], ' \
'"worker": ["host2:2222"]}, ' \
'"environment": "cloud", ' \
'"task": {"index": 0, "type": "worker"}}'

run.assert_called_with('s3://my/bucket', 'script_name',
distributed_training_env.to_cmd_args(),
{
'TF_CONFIG': master_tf_config})
{'TF_CONFIG': tf_config})


@patch('sagemaker_containers.beta.framework.entry_point.run')
Expand All @@ -174,18 +161,6 @@ def test_get_env_vars_with_tf_config(build_tf_config, distributed_training_env):
hosts=HOST_LIST, current_host=CURRENT_HOST, ps_task=True)


@patch('sagemaker_containers.beta.framework.entry_point.run')
@patch('sagemaker_tensorflow_container.training._env_vars_with_tf_config')
def test_run_ps(env_vars_with_tf_config, run, distributed_training_env):
training._run_ps(distributed_training_env)
env_vars_with_tf_config.assert_called_once_with(distributed_training_env, ps_task=True)

run.assert_called_once_with(distributed_training_env.module_dir,
distributed_training_env.user_entry_point,
distributed_training_env.to_cmd_args(), env_vars_with_tf_config(),
wait=False)


def test_build_tf_config():
assert training._build_tf_config(HOST_LIST, HOST1) == {
'cluster': CLUSTER_WITH_PS,
Expand Down