Skip to content

Allow Local Serving of Models in S3 #217

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ CHANGELOG
1.4.2
=====

* bug-fix: Unit Tests: Improve unit test runtime
* bug-fix: Estimators: Fix attach for LDA
* bug-fix: Estimators: allow code_location to have no key prefix
* bug-fix: Local Mode: Fix s3 training data download when there is a trailing slash
* feature: Allow Local Serving of Models in S3


1.4.1
Expand Down
65 changes: 47 additions & 18 deletions src/sagemaker/local/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import string
import subprocess
import sys
import tarfile
import tempfile
from fcntl import fcntl, F_GETFL, F_SETFL
from six.moves.urllib.parse import urlparse
Expand Down Expand Up @@ -137,7 +138,7 @@ def serve(self, primary_container):
Args:
primary_container (dict): dictionary containing the container runtime settings
for serving. Expected keys:
- 'ModelDataUrl' pointing to a local file
- 'ModelDataUrl' pointing to a file or s3:// location.
- 'Environment' a dictionary of environment variables to be passed to the hosting container.

"""
Expand All @@ -147,22 +148,17 @@ def serve(self, primary_container):
logger.info('creating hosting dir in {}'.format(self.container_root))

model_dir = primary_container['ModelDataUrl']
if not model_dir.lower().startswith("s3://"):
for h in self.hosts:
host_dir = os.path.join(self.container_root, h)
os.makedirs(host_dir)
shutil.copytree(model_dir, os.path.join(self.container_root, h, 'model'))

volumes = self._prepare_serving_volumes(model_dir)
env_vars = ['{}={}'.format(k, v) for k, v in primary_container['Environment'].items()]

_ecr_login_if_needed(self.sagemaker_session.boto_session, self.image)

# If the user script was passed as a file:// mount it to the container.
script_dir = primary_container['Environment'][sagemaker.estimator.DIR_PARAM_NAME.upper()]
parsed_uri = urlparse(script_dir)
volumes = []
if parsed_uri.scheme == 'file':
volumes.append(_Volume(parsed_uri.path, '/opt/ml/code'))
if sagemaker.estimator.DIR_PARAM_NAME.upper() in primary_container['Environment']:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the case where this is false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BYO :) we only have sagemaker_submit_dir on our Frameworks.

script_dir = primary_container['Environment'][sagemaker.estimator.DIR_PARAM_NAME.upper()]
parsed_uri = urlparse(script_dir)
if parsed_uri.scheme == 'file':
volumes.append(_Volume(parsed_uri.path, '/opt/ml/code'))

_ecr_login_if_needed(self.sagemaker_session.boto_session, self.image)

self._generate_compose_file('serve',
additional_env_vars=env_vars,
Expand Down Expand Up @@ -278,9 +274,20 @@ def _download_folder(self, bucket_name, prefix, target):
pass
obj.download_file(file_path)

def _download_file(self, bucket_name, path, target):
path = path.lstrip('/')
boto_session = self.sagemaker_session.boto_session

s3 = boto_session.resource('s3')
bucket = s3.Bucket(bucket_name)
bucket.download_file(path, target)

def _prepare_training_volumes(self, data_dir, input_data_config, hyperparameters):
shared_dir = os.path.join(self.container_root, 'shared')
model_dir = os.path.join(self.container_root, 'model')
volumes = []

volumes.append(_Volume(model_dir, '/opt/ml/model'))
# Set up the channels for the containers. For local data we will
# mount the local directory to the container. For S3 Data we will download the S3 data
# first.
Expand Down Expand Up @@ -321,6 +328,32 @@ def _prepare_training_volumes(self, data_dir, input_data_config, hyperparameters

return volumes

def _prepare_serving_volumes(self, model_location):
volumes = []
host = self.hosts[0]
# Make the model available to the container. If this is a local file just mount it to
# the container as a volume. If it is an S3 location download it and extract the tar file.
host_dir = os.path.join(self.container_root, host)
os.makedirs(host_dir)

if model_location.startswith('s3'):
container_model_dir = os.path.join(self.container_root, host, 'model')
os.makedirs(container_model_dir)

parsed_uri = urlparse(model_location)
filename = os.path.basename(parsed_uri.path)
tar_location = os.path.join(container_model_dir, filename)
self._download_file(parsed_uri.netloc, parsed_uri.path, tar_location)

if tarfile.is_tarfile(tar_location):
with tarfile.open(tar_location) as tar:
tar.extractall(path=container_model_dir)
volumes.append(_Volume(container_model_dir, '/opt/ml/model'))
else:
volumes.append(_Volume(model_location, '/opt/ml/model'))

return volumes

def _generate_compose_file(self, command, additional_volumes=None, additional_env_vars=None):
"""Writes a config file describing a training/hosting environment.

Expand Down Expand Up @@ -452,10 +485,6 @@ def _build_optml_volumes(self, host, subdirs):
"""
volumes = []

# Ensure that model is in the subdirs
if 'model' not in subdirs:
subdirs.add('model')

for subdir in subdirs:
host_dir = os.path.join(self.container_root, host, subdir)
container_dir = '/opt/ml/{}'.format(subdir)
Expand Down
11 changes: 9 additions & 2 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

import sagemaker

from sagemaker.local import LocalSession
from sagemaker.fw_utils import tar_and_upload_dir, parse_s3_url
from sagemaker.session import Session
from sagemaker.utils import name_from_image, get_config_value


class Model(object):
"""An SageMaker ``Model`` that can be deployed to an ``Endpoint``."""
"""A SageMaker ``Model`` that can be deployed to an ``Endpoint``."""

def __init__(self, model_data, image, role, predictor_cls=None, env=None, name=None, sagemaker_session=None):
"""Initialize an SageMaker ``Model``.
Expand All @@ -48,7 +49,7 @@ def __init__(self, model_data, image, role, predictor_cls=None, env=None, name=N
self.predictor_cls = predictor_cls
self.env = env or {}
self.name = name
self.sagemaker_session = sagemaker_session or Session()
self.sagemaker_session = sagemaker_session
self._model_name = None

def prepare_container_def(self, instance_type):
Expand Down Expand Up @@ -86,6 +87,12 @@ def deploy(self, initial_instance_count, instance_type, endpoint_name=None):
callable[string, sagemaker.session.Session] or None: Invocation of ``self.predictor_cls`` on
the created endpoint name, if ``self.predictor_cls`` is not None. Otherwise, return None.
"""
if not self.sagemaker_session:
if instance_type in ('local', 'local_gpu'):
self.sagemaker_session = LocalSession()
else:
self.sagemaker_session = Session()

container_def = self.prepare_container_def(instance_type)
model_name = self.name or name_from_image(container_def['Image'])
self.sagemaker_session.create_model(model_name, self.role, container_def)
Expand Down
5 changes: 5 additions & 0 deletions src/sagemaker/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def predict(self, data):
response_body.close()
return data

def delete_endpoint(self):
"""Delete the Amazon SageMaker endpoint backing this predictor.
"""
self.sagemaker_session.delete_endpoint(self.endpoint)


class _CsvSerializer(object):
def __init__(self):
Expand Down
74 changes: 73 additions & 1 deletion tests/integ/test_local_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

import boto3
import numpy
import pytest

from sagemaker.local import LocalSession, LocalSagemakerRuntimeClient, LocalSagemakerClient
from sagemaker.mxnet import MXNet
from sagemaker.mxnet import MXNet, MXNetModel
from sagemaker.tensorflow import TensorFlow
from sagemaker.fw_utils import tar_and_upload_dir
from tests.integ import DATA_DIR
from tests.integ.timeout import timeout

Expand Down Expand Up @@ -54,6 +56,25 @@ def _initialize(self, boto_session, sagemaker_client, sagemaker_runtime_client):
self.local_mode = True


@pytest.fixture(scope='module')
def mxnet_model(sagemaker_local_session):
script_path = os.path.join(DATA_DIR, 'mxnet_mnist', 'mnist.py')
data_path = os.path.join(DATA_DIR, 'mxnet_mnist')

mx = MXNet(entry_point=script_path, role='SageMakerRole',
train_instance_count=1, train_instance_type='local',
sagemaker_session=sagemaker_local_session)

train_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'train'),
key_prefix='integ-test-data/mxnet_mnist/train')
test_input = mx.sagemaker_session.upload_data(path=os.path.join(data_path, 'test'),
key_prefix='integ-test-data/mxnet_mnist/test')

mx.fit({'train': train_input, 'test': test_input})
model = mx.create_model(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the argument necessary here?

return model


def test_tf_local_mode(tf_full_version, sagemaker_local_session):
local_mode_lock_fd = open(LOCK_PATH, 'w')
local_mode_lock = local_mode_lock_fd.fileno()
Expand Down Expand Up @@ -230,6 +251,57 @@ def test_tf_local_data_local_script():
fcntl.lockf(local_mode_lock, fcntl.LOCK_UN)


def test_local_mode_serving_from_s3_model(sagemaker_local_session, mxnet_model):
local_mode_lock_fd = open(LOCK_PATH, 'w')
local_mode_lock = local_mode_lock_fd.fileno()

model_data = mxnet_model.model_data
boto_session = sagemaker_local_session.boto_session
default_bucket = sagemaker_local_session.default_bucket()
uploaded_data = tar_and_upload_dir(boto_session, default_bucket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only necessary to create the model data, right? In the normal use cases, the user would either do both training and deploy locally, or create the MXNetModel pointing to local model data, or already have an S3 url with model data, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is because I wanted to make this faster by training using local mode instead of training on sagemaker like the chainer tests do for instance.

'test_mxnet_local_mode', '', model_data)

s3_model = MXNetModel(model_data=uploaded_data.s3_prefix, role='SageMakerRole',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is role still necessary here?

Copy link
Contributor Author

@iquintero iquintero Jun 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its not necessary, but the estimators/model expects one. For SageMaker this is a must have, on local mode its not even used but I dont know if we should make this optional.

entry_point=mxnet_model.entry_point, image=mxnet_model.image,
sagemaker_session=sagemaker_local_session)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to explicitly pass a LocalSession here, or is that only for our tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It used to be required, now its only for our tests.


predictor = None
try:
# Since Local Mode uses the same port for serving, we need a lock in order
# to allow concurrent test execution. The serving test is really fast so it still
# makes sense to allow this behavior.
fcntl.lockf(local_mode_lock, fcntl.LOCK_EX)
predictor = s3_model.deploy(initial_instance_count=1, instance_type='local')
data = numpy.zeros(shape=(1, 1, 28, 28))
predictor.predict(data)
finally:
if predictor:
predictor.delete_endpoint()
time.sleep(5)
fcntl.lockf(local_mode_lock, fcntl.LOCK_UN)


def test_local_mode_serving_from_local_model(sagemaker_local_session, mxnet_model):
local_mode_lock_fd = open(LOCK_PATH, 'w')
local_mode_lock = local_mode_lock_fd.fileno()
predictor = None

try:
# Since Local Mode uses the same port for serving, we need a lock in order
# to allow concurrent test execution. The serving test is really fast so it still
# makes sense to allow this behavior.
fcntl.lockf(local_mode_lock, fcntl.LOCK_EX)
mxnet_model.sagemaker_session = sagemaker_local_session
predictor = mxnet_model.deploy(initial_instance_count=1, instance_type='local')
data = numpy.zeros(shape=(1, 1, 28, 28))
predictor.predict(data)
finally:
if predictor:
predictor.delete_endpoint()
time.sleep(5)
fcntl.lockf(local_mode_lock, fcntl.LOCK_UN)


def test_mxnet_local_mode(sagemaker_local_session):
local_mode_lock_fd = open(LOCK_PATH, 'w')
local_mode_lock = local_mode_lock_fd.fileno()
Expand Down
51 changes: 50 additions & 1 deletion tests/unit/test_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import pytest
import yaml
from mock import call, patch, Mock
from mock import call, patch, Mock, MagicMock

import sagemaker
from sagemaker.local.image import _SageMakerContainer
Expand Down Expand Up @@ -338,6 +338,42 @@ def test_serve_local_code(up, copy, copytree, tmpdir, sagemaker_session):
assert '%s:/opt/ml/code' % '/tmp/code' in volumes


@patch('sagemaker.local.image._SageMakerContainer._download_file')
@patch('tarfile.is_tarfile')
@patch('tarfile.open', MagicMock())
@patch('os.makedirs', Mock())
def test_prepare_serving_volumes_with_s3_model(is_tarfile, _download_file, sagemaker_session):

sagemaker_container = _SageMakerContainer('local', 1, 'some-image', sagemaker_session=sagemaker_session)
sagemaker_container.container_root = '/tmp/container_root'
container_model_dir = os.path.join('/tmp/container_root/', sagemaker_container.hosts[0], 'model')

is_tarfile.return_value = True

volumes = sagemaker_container._prepare_serving_volumes('s3://bucket/my_model.tar.gz')

tar_location = os.path.join(container_model_dir, 'my_model.tar.gz')
_download_file.assert_called_with('bucket', '/my_model.tar.gz', tar_location)
is_tarfile.assert_called_with(tar_location)

assert len(volumes) == 1
assert volumes[0].container_dir == '/opt/ml/model'
assert volumes[0].host_dir == container_model_dir


@patch('os.makedirs', Mock())
def test_prepare_serving_volumes_with_local_model(sagemaker_session):

sagemaker_container = _SageMakerContainer('local', 1, 'some-image', sagemaker_session=sagemaker_session)
sagemaker_container.container_root = '/tmp/container_root'

volumes = sagemaker_container._prepare_serving_volumes('/path/to/my_model')

assert len(volumes) == 1
assert volumes[0].container_dir == '/opt/ml/model'
assert volumes[0].host_dir == '/path/to/my_model'


@patch('os.makedirs')
def test_download_folder(makedirs):
boto_mock = Mock(name='boto_session')
Expand Down Expand Up @@ -377,6 +413,19 @@ def test_download_folder(makedirs):
obj_mock.download_file.assert_has_calls(calls)


def test_download_file():
boto_mock = Mock(name='boto_session')
boto_mock.client('sts').get_caller_identity.return_value = {'Account': '123'}
bucket_mock = Mock()
boto_mock.resource('s3').Bucket.return_value = bucket_mock
session = sagemaker.Session(boto_session=boto_mock, sagemaker_client=Mock())

sagemaker_container = _SageMakerContainer('local', 2, 'my-image', sagemaker_session=session)
sagemaker_container._download_file(BUCKET_NAME, '/prefix/path/file.tar.gz', '/tmp/file.tar.gz')

bucket_mock.download_file.assert_called_with('prefix/path/file.tar.gz', '/tmp/file.tar.gz')


def test_ecr_login_non_ecr():
session_mock = Mock()
sagemaker.local.image._ecr_login_if_needed(session_mock, 'ubuntu')
Expand Down
18 changes: 17 additions & 1 deletion tests/unit/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sagemaker.predictor import RealTimePredictor
import os
import pytest
from mock import Mock, patch
from mock import MagicMock, Mock, patch

MODEL_DATA = "s3://bucket/model.tar.gz"
MODEL_IMAGE = "mi"
Expand Down Expand Up @@ -115,3 +115,19 @@ def test_deploy_endpoint_name(tfo, time, sagemaker_session):
'InstanceType': INSTANCE_TYPE,
'InitialInstanceCount': 55,
'VariantName': 'AllTraffic'}])


@patch('sagemaker.model.Session')
@patch('sagemaker.model.LocalSession')
@patch('tarfile.open', MagicMock())
def test_deploy_creates_correct_session(local_session, session):

# We expect a LocalSession when deploying to instance_type = 'local'
model = DummyFrameworkModel(sagemaker_session=None)
model.deploy(endpoint_name='blah', instance_type='local', initial_instance_count=1)
assert model.sagemaker_session == local_session.return_value

# We expect a real Session when deploying to instance_type != local/local_gpu
model = DummyFrameworkModel(sagemaker_session=None)
model.deploy(endpoint_name='remote_endpoint', instance_type='ml.m4.4xlarge', initial_instance_count=2)
assert model.sagemaker_session == session.return_value