Skip to content

Add new APIs to clean up resources from predictor and transformer. #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ CHANGELOG

* doc-fix: update information about saving models in the MXNet README
* doc-fix: change ReadTheDocs links from latest to stable
* feature: Support for predictor to delete endpoint, and delete endpoint configuration with ``delete_endpoint()`` by default
* feature: Support for model class to delete SageMaker model
* feature: Support for transformer to delete Sagemaker model

1.18.2
======
Expand Down
23 changes: 16 additions & 7 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,16 @@ Here is an end to end example of how to use a SageMaker Estimator:
# Serializes data and makes a prediction request to the SageMaker endpoint
response = mxnet_predictor.predict(data)

# Tears down the SageMaker endpoint
mxnet_estimator.delete_endpoint()
# Tears down the SageMaker endpoint and endpoint configuration
mxnet_predictor.delete_endpoint()


The example above will eventually delete both the SageMaker endpoint and endpoint configuration through `delete_endpoint()`. If you want to keep your SageMaker endpoint configuration, use the value False for the `delete_endpoint_config` parameter, as shown below.

.. code:: python
# Only delete the endpoint and keep the endpoint endpoint configuration
mxnet_predictor.delete_endpoint(delete_endpoint_config=False)

Additionally, it is possible to deploy a different endpoint configuration, which links to your model, to an already existing SageMaker endpoint.
This can be done by specifying the existing endpoint name for the ``endpoint_name`` parameter along with the ``update_endpoint`` parameter as ``True`` within your ``deploy()`` call.
For more `information <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.update_endpoint>`__.
Expand Down Expand Up @@ -220,8 +226,8 @@ For more `information <https://boto3.amazonaws.com/v1/documentation/api/latest/r
# Serializes data and makes a prediction request to the SageMaker endpoint
response = mxnet_predictor.predict(data)

# Tears down the SageMaker endpoint
mxnet_estimator.delete_endpoint()
# Tears down the SageMaker endpoint and endpoint configuration
mxnet_predictor.delete_endpoint()

Training Metrics
~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -274,8 +280,8 @@ We can take the example in `Using Estimators <#using-estimators>`__ , and use e
# Serializes data and makes a prediction request to the local endpoint
response = mxnet_predictor.predict(data)

# Tears down the endpoint container
mxnet_estimator.delete_endpoint()
# Tears down the endpoint and endpoint configuration
mxnet_predictor.delete_endpoint()


If you have an existing model and want to deploy it locally, don't specify a sagemaker_session argument to the ``MXNetModel`` constructor.
Expand All @@ -297,7 +303,7 @@ Here is an end-to-end example:
data = numpy.zeros(shape=(1, 1, 28, 28))
predictor.predict(data)

# Tear down the endpoint container
# Tear down the endpoint and endpoint configuration
predictor.delete_endpoint()


Expand All @@ -322,6 +328,9 @@ Here is an end-to-end example:
transformer.transform('s3://my/transform/data, content_type='text/csv', split_type='Line')
transformer.wait()

# Deletes the SageMaker model
transformer.delete_model()


For detailed examples of running Docker in local mode, see:

Expand Down
8 changes: 8 additions & 0 deletions src/sagemaker/local/local_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ def delete_endpoint(self, EndpointName):
if EndpointName in LocalSagemakerClient._endpoints:
LocalSagemakerClient._endpoints[EndpointName].stop()

def delete_endpoint_config(self, EndpointConfigName):
if EndpointConfigName in LocalSagemakerClient._endpoint_configs:
del LocalSagemakerClient._endpoint_configs[EndpointConfigName]

def delete_model(self, ModelName):
if ModelName in LocalSagemakerClient._models:
del LocalSagemakerClient._models[ModelName]


class LocalSagemakerRuntimeClient(object):
"""A SageMaker Runtime client that calls a local endpoint only.
Expand Down
10 changes: 10 additions & 0 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,16 @@ def transformer(self, instance_count, instance_type, strategy=None, assemble_wit
env=env, tags=tags, base_transform_job_name=self.name,
volume_kms_key=volume_kms_key, sagemaker_session=self.sagemaker_session)

def delete_model(self):
"""Delete an Amazon SageMaker ``Model``.

Raises: ValueError if model is not deployed yet.

"""
if self.name is None:
raise ValueError('The SageMaker model must be deployed first before attempting to delete.')
self.sagemaker_session.delete_model(self.name)


SCRIPT_PARAM_NAME = 'sagemaker_program'
DIR_PARAM_NAME = 'sagemaker_submit_directory'
Expand Down
23 changes: 21 additions & 2 deletions src/sagemaker/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,28 @@ def _create_request_args(self, data, initial_args=None):
args['Body'] = data
return args

def delete_endpoint(self):
"""Delete the Amazon SageMaker endpoint backing this predictor.
def _delete_endpoint_config(self):
"""Delete the Amazon SageMaker endpoint configuration
"""
try:
endpoint_description = self.sagemaker_session.sagemaker_client.describe_endpoint(EndpointName=self.endpoint)
endpoint_config_name = endpoint_description['EndpointConfigName']
self.sagemaker_session.delete_endpoint_config(endpoint_config_name)
except Exception:
raise ValueError('The endpoint this config attached to does not exist.')

def delete_endpoint(self, delete_endpoint_config=True):
"""Delete the Amazon SageMaker endpoint backing this predictor. Also delete the endpoint configuration attached
to it if delete_endpoint_config is True.

Args:
delete_endpoint_config (bool): Flag to indicate whether to delete endpoint configuration together with
endpoint. If False, only endpoint will be deleted. Default: True.

"""
if delete_endpoint_config:
self._delete_endpoint_config()

self.sagemaker_session.delete_endpoint(self.endpoint)


Expand Down
25 changes: 25 additions & 0 deletions src/sagemaker/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,31 @@ def delete_endpoint(self, endpoint_name):
LOGGER.info('Deleting endpoint with name: {}'.format(endpoint_name))
self.sagemaker_client.delete_endpoint(EndpointName=endpoint_name)

def delete_endpoint_config(self, endpoint_config_name):
"""Delete an Amazon SageMaker endpoint configuration.

Args:
endpoint_config_name (str): Name of the Amazon SageMaker endpoint configuration to delete.
"""
LOGGER.info('Deleting endpoint configuration with name: {}'.format(endpoint_config_name))
self.sagemaker_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)

def delete_model(self, model_name):
"""Delete an Amazon SageMaker ``Model``.

Args:
model_name (str): Name of the Amazon SageMaker model to delete.

Raises: ValueError if model is not deployed yet.

"""
try:
self.sagemaker_client.describe_model(ModelName=model_name)
LOGGER.info('Deleting model with name: {}'.format(model_name))
self.sagemaker_client.delete_model(ModelName=model_name)
except Exception:
raise ValueError('The Sagemaker model must be deployed first before attempting to delete.')

def wait_for_job(self, job, poll=5):
"""Wait for an Amazon SageMaker training job to complete.

Expand Down
8 changes: 8 additions & 0 deletions src/sagemaker/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ def __init__(self, model_name, instance_count, instance_type, strategy=None, ass
using the default AWS configuration chain.
volume_kms_key (str): Optional. KMS key ID for encrypting the volume attached to the ML
compute instance (default: None).
model (sagemaker.model.Model): A SageMaker Model object, used for SageMaker Model interactions
(default: None). If not specified, model object related activities will fail.
"""
self.model_name = model_name
self.strategy = strategy
Expand Down Expand Up @@ -112,6 +114,12 @@ def transform(self, data, data_type='S3Prefix', content_type=None, compression_t
self.latest_transform_job = _TransformJob.start_new(self, data, data_type, content_type, compression_type,
split_type)

def delete_model(self):
"""Delete a SageMaker Model.

"""
self.sagemaker_session.delete_model(self.model_name)

def _retrieve_image_name(self):
model_desc = self.sagemaker_session.sagemaker_client.describe_model(ModelName=self.model_name)
return model_desc['PrimaryContainer']['Image']
Expand Down
48 changes: 45 additions & 3 deletions tests/integ/test_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from sagemaker.transformer import Transformer
from tests.integ import DATA_DIR, TRAINING_DEFAULT_TIMEOUT_MINUTES, TRANSFORM_DEFAULT_TIMEOUT_MINUTES
from tests.integ.kms_utils import get_or_create_kms_key
from tests.integ.timeout import timeout
from tests.integ.timeout import timeout, timeout_and_delete_model_with_transformer
from tests.integ.vpc_test_utils import get_or_create_vpc_resources


Expand Down Expand Up @@ -56,7 +56,8 @@ def test_transform_mxnet(sagemaker_session, mxnet_full_version):
kms_key_arn = get_or_create_kms_key(kms_client, account_id)

transformer = _create_transformer_and_transform_job(mx, transform_input, kms_key_arn)
with timeout(minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
with timeout_and_delete_model_with_transformer(transformer, sagemaker_session,
minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
transformer.wait()

job_desc = transformer.sagemaker_session.sagemaker_client.describe_transform_job(
Expand Down Expand Up @@ -100,10 +101,51 @@ def test_attach_transform_kmeans(sagemaker_session):

attached_transformer = Transformer.attach(transformer.latest_transform_job.name,
sagemaker_session=sagemaker_session)
with timeout(minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
with timeout_and_delete_model_with_transformer(transformer, sagemaker_session,
minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
attached_transformer.wait()


# def test_transformer_delete_model(sagemaker_session):
# data_path = os.path.join(DATA_DIR, 'one_p_mnist')
# pickle_args = {} if sys.version_info.major == 2 else {'encoding': 'latin1'}
#
# train_set_path = os.path.join(data_path, 'mnist.pkl.gz')
# with gzip.open(train_set_path, 'rb') as f:
# train_set, _, _ = pickle.load(f, **pickle_args)
#
# kmeans = KMeans(role='SageMakerRole', train_instance_count=1,
# train_instance_type='ml.c4.xlarge', k=10, sagemaker_session=sagemaker_session,
# output_path='s3://{}/'.format(sagemaker_session.default_bucket()))
#
# kmeans.init_method = 'random'
# kmeans.max_iterations = 1
# kmeans.tol = 1
# kmeans.num_trials = 1
# kmeans.local_init_method = 'kmeans++'
# kmeans.half_life_time_size = 1
# kmeans.epochs = 1
#
# records = kmeans.record_set(train_set[0][:100])
# with timeout(minutes=TRAINING_DEFAULT_TIMEOUT_MINUTES):
# kmeans.fit(records)
#
# transform_input_path = os.path.join(data_path, 'transform_input.csv')
# transform_input_key_prefix = 'integ-test-data/one_p_mnist/transform'
# transform_input = kmeans.sagemaker_session.upload_data(path=transform_input_path,
# key_prefix=transform_input_key_prefix)
#
# transformer = _create_transformer_and_transform_job(kmeans, transform_input)
# with timeout(minutes=TRANSFORM_DEFAULT_TIMEOUT_MINUTES):
# transformer.wait()
#
# transformer.delete_model()
#
# with pytest.raises(Exception) as exception:
# sagemaker_session.sagemaker_client.describe_model(ModelName=transformer.model_name)
# assert 'Could not find model' in exception.value.message


def test_transform_mxnet_vpc(sagemaker_session, mxnet_full_version):
data_path = os.path.join(DATA_DIR, 'mxnet_mnist')
script_path = os.path.join(data_path, 'mnist.py')
Expand Down
48 changes: 48 additions & 0 deletions tests/integ/timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,54 @@ def timeout_and_delete_endpoint_by_name(endpoint_name, sagemaker_session, second
sleep(10)


@contextmanager
def timeout_and_delete_model_with_transformer(transformer, sagemaker_session, seconds=0, minutes=0, hours=0):
with timeout(seconds=seconds, minutes=minutes, hours=hours) as t:
no_errors = False
try:
yield [t]
no_errors = True
finally:
attempts = 3

while attempts > 0:
attempts -= 1
try:
transformer.delete_model()
LOGGER.info('deleted SageMaker model {}'.format(transformer.model_name))
if no_errors:
_cleanup_model_logs(transformer.model_name, sagemaker_session)
return
except ClientError as ce:
if ce.response['Error']['Code'] == 'ValidationException':
pass
sleep(10)


def _show_model_logs(model_name, sagemaker_session):
log_group = '/aws/sagemaker/Models/{}'.format(model_name)
try:
LOGGER.info('cloudwatch logs for log group {}'.format(log_group))
logs = AWSLogs(log_group_name=log_group, log_stream_name='ALL', start='1d',
aws_region=sagemaker_session.boto_session.region_name)
logs.list_logs()
except Exception:
LOGGER.exception('Failure occurred while listing cloudwatch log group %s. Swallowing exception but printing '
'stacktrace for debugging.', log_group)


def _cleanup_model_logs(model_name, sagemaker_session):
log_group = '/aws/sagemaker/Models/{}'.format(model_name)
try:
LOGGER.info('deleting cloudwatch log group {}:'.format(log_group))
cwl_client = sagemaker_session.boto_session.client('logs')
cwl_client.delete_log_group(logGroupName=log_group)
LOGGER.info('deleted cloudwatch log group: {}'.format(log_group))
except Exception:
LOGGER.exception('Failure occurred while cleaning up cloudwatch log group %s. '
'Swallowing exception but printing stacktrace for debugging.', log_group)


def _show_endpoint_logs(endpoint_name, sagemaker_session):
log_group = '/aws/sagemaker/Endpoints/{}'.format(endpoint_name)
try:
Expand Down
Loading