Skip to content

Add support for file:// URI as the input for LocalMode training data #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
CHANGELOG
=========


1.2.dev5
========

* bug-fix: Change module names to string type in __all__
* feature: Local Mode: add support for local training data using file://

1.2.4
=====
Expand Down
7 changes: 5 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Local Mode
~~~~~~~~~~

The SageMaker Python SDK now supports local mode, which allows you to create TensorFlow, MXNet and BYO estimators and
deploy to your local environment.  This is a great way to test your deep learning script before running in
deploy to your local environment. This is a great way to test your deep learning script before running in
SageMaker's managed training or hosting environments.

We can take the example in `Estimator Usage <#estimator-usage>`__ , and use either ``local`` or ``local_gpu`` as the
Expand All @@ -166,6 +166,9 @@ instance type.
# In Local Mode, fit will pull the MXNet container docker image and run it locally
mxnet_estimator.fit('s3://my_bucket/my_training_data/')

# Alternatively, you can train using data in your local file system. This is only supported in Local mode.
mxnet_estimator.fit('file:///tmp/my_training_data')

# Deploys the model that was generated by fit() to local endpoint in a container
mxnet_predictor = mxnet_estimator.deploy(initial_instance_count=1, instance_type='local')

Expand All @@ -184,7 +187,7 @@ For detailed examples of running docker in local mode, see:
A few important notes:

- Only one local mode endpoint can be running at a time
- Since the data are pulled from S3 to your local environment, please ensure you have sufficient space.
- If you are using s3 data as input, it will be pulled from S3 to your local environment, please ensure you have sufficient space.
- If you run into problems, this is often due to different docker containers conflicting.  Killing these containers and re-running often solves your problems.
- Local Mode requires docker-compose and `nvidia-docker2 <https://github.com/NVIDIA/nvidia-docker>`__ for ``local_gpu``.
- Distributed training is not yet supported for ``local_gpu``.
Expand Down
35 changes: 26 additions & 9 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
from sagemaker.fw_utils import tar_and_upload_dir
from sagemaker.fw_utils import parse_s3_url
from sagemaker.fw_utils import UploadedCode
from sagemaker.local.local_session import LocalSession
from sagemaker.local.local_session import LocalSession, file_input

from sagemaker.model import Model
from sagemaker.model import (SCRIPT_PARAM_NAME, DIR_PARAM_NAME, CLOUDWATCH_METRICS_PARAM_NAME,
CONTAINER_LOG_LEVEL_PARAM_NAME, JOB_NAME_PARAM_NAME, SAGEMAKER_REGION_PARAM_NAME)

from sagemaker.predictor import RealTimePredictor
from sagemaker.session import Session
from sagemaker.session import s3_input
Expand Down Expand Up @@ -321,6 +323,13 @@ def start_new(cls, estimator, inputs):
sagemaker.estimator.Framework: Constructed object that captures all information about the started job.
"""

local_mode = estimator.local_mode

# Allow file:// input only in local mode
if isinstance(inputs, str) and inputs.startswith('file://'):
if not local_mode:
raise ValueError('File URIs are supported in local mode only. Please use a S3 URI instead.')

input_config = _TrainingJob._format_inputs_to_input_config(inputs)
role = estimator.sagemaker_session.expand_role(estimator.role)
output_config = _TrainingJob._prepare_output_config(estimator.output_path, estimator.output_kms_key)
Expand All @@ -343,12 +352,14 @@ def start_new(cls, estimator, inputs):
def _format_inputs_to_input_config(inputs):
input_dict = {}
if isinstance(inputs, string_types):
input_dict['training'] = _TrainingJob._format_s3_uri_input(inputs)
input_dict['training'] = _TrainingJob._format_string_uri_input(inputs)
elif isinstance(inputs, s3_input):
input_dict['training'] = inputs
elif isinstance(input, file_input):
input_dict['training'] = inputs
elif isinstance(inputs, dict):
for k, v in inputs.items():
input_dict[k] = _TrainingJob._format_s3_uri_input(v)
input_dict[k] = _TrainingJob._format_string_uri_input(v)
else:
raise ValueError('Cannot format input {}. Expecting one of str, dict or s3_input'.format(inputs))

Expand All @@ -360,15 +371,21 @@ def _format_inputs_to_input_config(inputs):
return channels

@staticmethod
def _format_s3_uri_input(input):
def _format_string_uri_input(input):
if isinstance(input, str):
if not input.startswith('s3://'):
raise ValueError('Training input data must be a valid S3 URI and must start with "s3://"')
return s3_input(input)
if isinstance(input, s3_input):
if input.startswith('s3://'):
return s3_input(input)
elif input.startswith('file://'):
return file_input(input)
else:
raise ValueError('Training input data must be a valid S3 or FILE URI: must start with "s3://" or '
'"file://"')
elif isinstance(input, s3_input):
return input
elif isinstance(input, file_input):
return input
else:
raise ValueError('Cannot format input {}. Expecting one of str or s3_input'.format(input))
raise ValueError('Cannot format input {}. Expecting one of str, s3_input, or file_input'.format(input))

@staticmethod
def _prepare_output_config(s3_path, kms_key_id):
Expand Down
13 changes: 11 additions & 2 deletions src/sagemaker/local/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,13 @@ def train(self, input_data_config, hyperparameters):
# mount the local directory to the container. For S3 Data we will download the S3 data
# first.
for channel in input_data_config:
uri = channel['DataSource']['S3DataSource']['S3Uri']
if channel['DataSource'] and 'S3DataSource' in channel['DataSource']:
uri = channel['DataSource']['S3DataSource']['S3Uri']
elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']:
uri = channel['DataSource']['FileDataSource']['FileUri']
else:
raise ValueError('Need channel[\'DataSource\'] to have [\'S3DataSource\'] or [\'FileDataSource\']')

parsed_uri = urlparse(uri)
key = parsed_uri.path.lstrip('/')

Expand All @@ -104,8 +110,11 @@ def train(self, input_data_config, hyperparameters):
if parsed_uri.scheme == 's3':
bucket_name = parsed_uri.netloc
self._download_folder(bucket_name, key, channel_dir)
elif parsed_uri.scheme == 'file':
path = parsed_uri.path
volumes.append(_Volume(path, channel=channel_name))
else:
volumes.append(_Volume(uri, channel=channel_name))
raise ValueError('Unknown URI scheme {}'.format(parsed_uri.scheme))

# Create the configuration files for each container that we will create
# Each container will map the additional local volumes (if any).
Expand Down
34 changes: 32 additions & 2 deletions src/sagemaker/local/local_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,22 @@ def create_training_job(self, TrainingJobName, AlgorithmSpecification, RoleArn,
AlgorithmSpecification['TrainingImage'], self.sagemaker_session)

for channel in InputDataConfig:
data_distribution = channel['DataSource']['S3DataSource']['S3DataDistributionType']

if channel['DataSource'] and 'S3DataSource' in channel['DataSource']:
data_distribution = channel['DataSource']['S3DataSource']['S3DataDistributionType']
elif channel['DataSource'] and 'FileDataSource' in channel['DataSource']:
data_distribution = channel['DataSource']['FileDataSource']['FileDataDistributionType']
else:
raise ValueError('Need channel[\'DataSource\'] to have [\'S3DataSource\'] or [\'FileDataSource\']')

if data_distribution != 'FullyReplicated':
raise RuntimeError("DataDistribution: %s is not currently supported in Local Mode" %
data_distribution)

self.s3_model_artifacts = self.train_container.train(InputDataConfig, HyperParameters)

def describe_training_job(self, TrainingJobName):
"""Describe a local traininig job.
"""Describe a local training job.

Args:
TrainingJobName (str): Not used in this implmentation.
Expand Down Expand Up @@ -171,3 +178,26 @@ def logs_for_job(self, job_name, wait=False, poll=5):
# override logs_for_job() as it doesn't need to perform any action
# on local mode.
pass


class file_input(object):
"""Amazon SageMaker channel configuration for FILE data sources, used in local mode.

Attributes:
config (dict[str, dict]): A SageMaker ``DataSource`` referencing a SageMaker ``FileDataSource``.
"""

def __init__(self, fileUri, content_type=None):
"""Create a definition for input data used by an SageMaker training job in local mode.
"""
self.config = {
'DataSource': {
'FileDataSource': {
'FileDataDistributionType': 'FullyReplicated',
'FileUri': fileUri
}
}
}

if content_type is not None:
self.config['ContentType'] = content_type
5 changes: 2 additions & 3 deletions tests/unit/test_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def test_sagemaker_s3_uri_invalid(sagemaker_session):
t = DummyFramework(entry_point=SCRIPT_PATH, role=ROLE, sagemaker_session=sagemaker_session,
train_instance_count=INSTANCE_COUNT, train_instance_type=INSTANCE_TYPE)
t.fit('thisdoesntstartwiths3')
assert 'must be a valid S3 URI' in str(error)
assert 'must be a valid S3 or FILE URI' in str(error)


@patch('time.strftime', return_value=TIMESTAMP)
Expand Down Expand Up @@ -427,9 +427,8 @@ def test_unsupported_type():


def test_unsupported_type_in_dict():
with pytest.raises(ValueError) as error:
with pytest.raises(ValueError):
_TrainingJob._format_inputs_to_input_config({'a': 66})
assert 'Expecting one of str or s3_input' in str(error)


#################################################################################
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
{
'ChannelName': 'a',
'DataSource': {
'S3DataSource': {
'S3DataDistributionType': 'FullyReplicated',
'S3DataType': 'S3Prefix',
'S3Uri': '/tmp/source1'
'FileDataSource': {
'FileDataDistributionType': 'FullyReplicated',
'FileUri': 'file:///tmp/source1'
}
}
},
Expand Down
113 changes: 112 additions & 1 deletion tests/unit/test_local_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,26 @@ def test_create_training_job(train, LocalSession):
image = "my-docker-image:1.0"

algo_spec = {'TrainingImage': image}
input_data_config = {}
input_data_config = [
{
'ChannelName': 'a',
'DataSource': {
'S3DataSource': {
'S3DataDistributionType': 'FullyReplicated',
'S3Uri': 's3://my_bucket/tmp/source1'
}
}
},
{
'ChannelName': 'b',
'DataSource': {
'FileDataSource': {
'FileDataDistributionType': 'FullyReplicated',
'FileUri': 'file:///tmp/source1'
}
}
}
]
output_data_config = {}
resource_config = {'InstanceType': 'local', 'InstanceCount': instance_count}
hyperparameters = {'a': 1, 'b': 'bee'}
Expand All @@ -61,6 +80,67 @@ def test_create_training_job(train, LocalSession):
assert response['ModelArtifacts']['S3ModelArtifacts'] == expected['ModelArtifacts']['S3ModelArtifacts']


@patch('sagemaker.local.image._SageMakerContainer.train', return_value="/some/path/to/model")
@patch('sagemaker.local.local_session.LocalSession')
def test_create_training_job_invalid_data_source(train, LocalSession):
local_sagemaker_client = sagemaker.local.local_session.LocalSagemakerClient()

instance_count = 2
image = "my-docker-image:1.0"

algo_spec = {'TrainingImage': image}

# InvalidDataSource is not supported. S3DataSource and FileDataSource are currently the only
# valid Data Sources. We expect a ValueError if we pass this input data config.
input_data_config = [{
'ChannelName': 'a',
'DataSource': {
'InvalidDataSource': {
'FileDataDistributionType': 'FullyReplicated',
'FileUri': 'ftp://myserver.com/tmp/source1'
}
}
}]

output_data_config = {}
resource_config = {'InstanceType': 'local', 'InstanceCount': instance_count}
hyperparameters = {'a': 1, 'b': 'bee'}

with pytest.raises(ValueError):
local_sagemaker_client.create_training_job("my-training-job", algo_spec, 'arn:my-role', input_data_config,
output_data_config, resource_config, None, hyperparameters)


@patch('sagemaker.local.image._SageMakerContainer.train', return_value="/some/path/to/model")
@patch('sagemaker.local.local_session.LocalSession')
def test_create_training_job_not_fully_replicated(train, LocalSession):
local_sagemaker_client = sagemaker.local.local_session.LocalSagemakerClient()

instance_count = 2
image = "my-docker-image:1.0"

algo_spec = {'TrainingImage': image}

# Local Mode only supports FullyReplicated as Data Distribution type.
input_data_config = [{
'ChannelName': 'a',
'DataSource': {
'S3DataSource': {
'S3DataDistributionType': 'ShardedByS3Key',
'S3Uri': 's3://my_bucket/tmp/source1'
}
}
}]

output_data_config = {}
resource_config = {'InstanceType': 'local', 'InstanceCount': instance_count}
hyperparameters = {'a': 1, 'b': 'bee'}

with pytest.raises(RuntimeError):
local_sagemaker_client.create_training_job("my-training-job", algo_spec, 'arn:my-role', input_data_config,
output_data_config, resource_config, None, hyperparameters)


@patch('sagemaker.local.local_session.LocalSession')
def test_create_model(LocalSession):
local_sagemaker_client = sagemaker.local.local_session.LocalSagemakerClient()
Expand Down Expand Up @@ -130,3 +210,34 @@ def test_create_endpoint_fails(serve, request, LocalSession):

with pytest.raises(RuntimeError):
local_sagemaker_client.create_endpoint('my-endpoint', 'some-endpoint-config')


def test_file_input_all_defaults():
prefix = 'pre'
actual = sagemaker.local.local_session.file_input(fileUri=prefix)
expected = \
{
'DataSource': {
'FileDataSource': {
'FileDataDistributionType': 'FullyReplicated',
'FileUri': prefix
}
}
}
assert actual.config == expected


def test_file_input_content_type():
prefix = 'pre'
actual = sagemaker.local.local_session.file_input(fileUri=prefix, content_type='text/csv')
expected = \
{
'DataSource': {
'FileDataSource': {
'FileDataDistributionType': 'FullyReplicated',
'FileUri': prefix
}
},
'ContentType': 'text/csv'
}
assert actual.config == expected