Skip to content

Add APIs to export Airflow model config #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Nov 16, 2018
2 changes: 1 addition & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ CHANGELOG

* bug-fix: Changes to use correct S3 bucket and time range for dataframes in TrainingJobAnalytics.
* bug-fix: Local Mode: correctly handle the case where the model output folder doesn't exist yet
* feature: Add APIs to export Airflow training and tuning config
* feature: Add APIs to export Airflow training, tuning and model config
* doc-fix: Fix typos in tensorflow serving documentation
* doc-fix: Add estimator base classes to API docs
* feature: HyperparameterTuner: add support for Automatic Model Tuning's Warm Start Jobs
Expand Down
10 changes: 8 additions & 2 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,14 @@ def deploy(self, initial_instance_count, instance_type, endpoint_name=None, **kw
@property
def model_data(self):
"""str: The model location in S3. Only set if Estimator has been ``fit()``."""
return self.sagemaker_session.sagemaker_client.describe_training_job(
TrainingJobName=self.latest_training_job.name)['ModelArtifacts']['S3ModelArtifacts']
if self.latest_training_job is not None:
model_uri = self.sagemaker_session.sagemaker_client.describe_training_job(
TrainingJobName=self.latest_training_job.name)['ModelArtifacts']['S3ModelArtifacts']
else:
logging.warning('No finished training job found associated with this estimator. Please make sure'
'this estimator is only used for building workflow config')
model_uri = os.path.join(self.output_path, self._current_job_name, 'output', 'model.tar.gz')
return model_uri

@abstractmethod
def create_model(self, **kwargs):
Expand Down
1 change: 1 addition & 0 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def __init__(self, model_data, image, role, entry_point, source_dir=None, predic
self.bucket, self.key_prefix = parse_s3_url(code_location)
else:
self.bucket, self.key_prefix = None, None
self.uploaded_code = None

def prepare_container_def(self, instance_type): # pylint disable=unused-argument
"""Return a container definition with framework configuration set in model environment variables.
Expand Down
157 changes: 142 additions & 15 deletions src/sagemaker/workflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,41 @@
import os

import sagemaker
from sagemaker import job, model, utils
from sagemaker import fw_utils, job, utils, session, vpc_utils
from sagemaker.amazon import amazon_estimator


def prepare_framework(estimator, s3_operations):
"""Prepare S3 operations (specify where to upload source_dir) and environment variables
"""Prepare S3 operations (specify where to upload `source_dir`) and environment variables
related to framework.
Args:
estimator (sagemaker.estimator.Estimator): The framework estimator to get information from and update.
s3_operations (dict): The dict to specify s3 operations (upload source_dir).
s3_operations (dict): The dict to specify s3 operations (upload `source_dir`).
"""
bucket = estimator.code_location if estimator.code_location else estimator.sagemaker_session._default_bucket
key = '{}/source/sourcedir.tar.gz'.format(estimator._current_job_name)
script = os.path.basename(estimator.entry_point)
if estimator.source_dir and estimator.source_dir.lower().startswith('s3://'):
code_dir = estimator.source_dir
estimator.uploaded_code = fw_utils.UploadedCode(s3_prefix=code_dir, script_name=script)
else:
code_dir = 's3://{}/{}'.format(bucket, key)
estimator.uploaded_code = fw_utils.UploadedCode(s3_prefix=code_dir, script_name=script)
s3_operations['S3Upload'] = [{
'Path': estimator.source_dir or script,
'Bucket': bucket,
'Key': key,
'Tar': True
}]
estimator._hyperparameters[model.DIR_PARAM_NAME] = code_dir
estimator._hyperparameters[model.SCRIPT_PARAM_NAME] = script
estimator._hyperparameters[model.CLOUDWATCH_METRICS_PARAM_NAME] = estimator.enable_cloudwatch_metrics
estimator._hyperparameters[model.CONTAINER_LOG_LEVEL_PARAM_NAME] = estimator.container_log_level
estimator._hyperparameters[model.JOB_NAME_PARAM_NAME] = estimator._current_job_name
estimator._hyperparameters[model.SAGEMAKER_REGION_PARAM_NAME] = estimator.sagemaker_session.boto_region_name
estimator._hyperparameters[sagemaker.model.DIR_PARAM_NAME] = code_dir
estimator._hyperparameters[sagemaker.model.SCRIPT_PARAM_NAME] = script
estimator._hyperparameters[sagemaker.model.CLOUDWATCH_METRICS_PARAM_NAME] = \
estimator.enable_cloudwatch_metrics
estimator._hyperparameters[sagemaker.model.CONTAINER_LOG_LEVEL_PARAM_NAME] = estimator.container_log_level
estimator._hyperparameters[sagemaker.model.JOB_NAME_PARAM_NAME] = estimator._current_job_name
estimator._hyperparameters[sagemaker.model.SAGEMAKER_REGION_PARAM_NAME] = \
estimator.sagemaker_session.boto_region_name


def prepare_amazon_algorithm_estimator(estimator, inputs, mini_batch_size=None):
Expand Down Expand Up @@ -102,8 +106,8 @@ def training_base_config(estimator, inputs=None, job_name=None, mini_batch_size=
mini_batch_size (int): Specify this argument only when estimator is a built-in estimator of an
Amazon algorithm. For other estimators, batch size should be specified in the estimator.
Returns (dict):
Training config that can be directly used by SageMakerTrainingOperator in Airflow.
Returns:
dict: Training config that can be directly used by SageMakerTrainingOperator in Airflow.
"""
default_bucket = estimator.sagemaker_session.default_bucket()
s3_operations = {}
Expand Down Expand Up @@ -181,8 +185,8 @@ def training_config(estimator, inputs=None, job_name=None, mini_batch_size=None)
mini_batch_size (int): Specify this argument only when estimator is a built-in estimator of an
Amazon algorithm. For other estimators, batch size should be specified in the estimator.
Returns (dict):
Training config that can be directly used by SageMakerTrainingOperator in Airflow.
Returns:
dict: Training config that can be directly used by SageMakerTrainingOperator in Airflow.
"""

train_config = training_base_config(estimator, inputs, job_name, mini_batch_size)
Expand Down Expand Up @@ -219,8 +223,8 @@ def tuning_config(tuner, inputs, job_name=None):
job_name (str): Specify a tuning job name if needed.
Returns (dict):
Tuning config that can be directly used by SageMakerTuningOperator in Airflow.
Returns:
dict: Tuning config that can be directly used by SageMakerTuningOperator in Airflow.
"""
train_config = training_base_config(tuner.estimator, inputs)
hyperparameters = train_config.pop('HyperParameters', None)
Expand Down Expand Up @@ -269,3 +273,126 @@ def tuning_config(tuner, inputs, job_name=None):
tune_config['S3Operations'] = s3_operations

return tune_config


def prepare_framework_container_def(model, instance_type, s3_operations):
"""Prepare the framework model container information. Specify related S3 operations for Airflow to perform.
(Upload `source_dir`)
Args:
model (sagemaker.model.FrameworkModel): The framework model
instance_type (str): The EC2 instance type to deploy this Model to. For example, 'ml.p2.xlarge'.
s3_operations (dict): The dict to specify S3 operations (upload `source_dir`).
Returns:
dict: The container information of this framework model.
"""
deploy_image = model.image
if not deploy_image:
region_name = model.sagemaker_session.boto_session.region_name
deploy_image = fw_utils.create_image_uri(
region_name, model.__framework_name__, instance_type, model.framework_version, model.py_version)

base_name = utils.base_name_from_image(deploy_image)
model.name = model.name or utils.airflow_name_from_base(base_name)

bucket = model.bucket or model.sagemaker_session._default_bucket
script = os.path.basename(model.entry_point)
key = '{}/source/sourcedir.tar.gz'.format(model.name)

if model.source_dir and model.source_dir.lower().startswith('s3://'):
model.uploaded_code = fw_utils.UploadedCode(s3_prefix=model.source_dir, script_name=script)
else:
code_dir = 's3://{}/{}'.format(bucket, key)
model.uploaded_code = fw_utils.UploadedCode(s3_prefix=code_dir, script_name=script)
s3_operations['S3Upload'] = [{
'Path': model.source_dir or script,
'Bucket': bucket,
'Key': key,
'Tar': True
}]

deploy_env = dict(model.env)
deploy_env.update(model._framework_env_vars())

try:
if model.model_server_workers:
deploy_env[sagemaker.model.MODEL_SERVER_WORKERS_PARAM_NAME.upper()] = str(model.model_server_workers)
except AttributeError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there no better way to check for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep because model_server_workers is not a member of FrameworkModel class. So exception will be thrown if we try doing model.model_server_workers if model is a FrameworkModel but not a Deep Learning framework model. And here in the codes, we don't know whether it's just a FrameworkModel or Deep Learning framework model.

# This applies to a FrameworkModel which is not SageMaker Deep Learning Framework Model
pass

return sagemaker.container_def(deploy_image, model.model_data, deploy_env)


def model_config(instance_type, model, role=None, image=None):
"""Export Airflow model config from a SageMaker model
Args:
instance_type (str): The EC2 instance type to deploy this Model to. For example, 'ml.p2.xlarge'
model (sagemaker.model.FrameworkModel): The SageMaker model to export Airflow config from
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the model
image (str): An container image to use for deploying the model
Returns:
dict: Model config that can be directly used by SageMakerModelOperator in Airflow. It can also be part
of the config used by SageMakerEndpointOperator and SageMakerTransformOperator in Airflow.
"""
s3_operations = {}
model.image = image or model.image

if isinstance(model, sagemaker.model.FrameworkModel):
container_def = prepare_framework_container_def(model, instance_type, s3_operations)
else:
container_def = model.prepare_container_def(instance_type)
base_name = utils.base_name_from_image(container_def['Image'])
model.name = model.name or utils.airflow_name_from_base(base_name)

primary_container = session._expand_container_def(container_def)

config = {
'ModelName': model.name,
'PrimaryContainer': primary_container,
'ExecutionRoleArn': role or model.role
}

if model.vpc_config:
config['VpcConfig'] = model.vpc_config

if s3_operations:
config['S3Operations'] = s3_operations

return config


def model_config_from_estimator(instance_type, estimator, role=None, image=None, model_server_workers=None,
vpc_config_override=vpc_utils.VPC_CONFIG_DEFAULT):
"""Export Airflow model config from a SageMaker estimator
Args:
instance_type (str): The EC2 instance type to deploy this Model to. For example, 'ml.p2.xlarge'
estimator (sagemaker.model.EstimatorBase): The SageMaker estimator to export Airflow config from.
It has to be an estimator associated with a training job.
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the model
image (str): An container image to use for deploying the model
model_server_workers (int): The number of worker processes used by the inference server.
If None, server will use one worker per vCPU. Only effective when estimator is
SageMaker framework.
vpc_config_override (dict[str, list[str]]): Override for VpcConfig set on the model.
Default: use subnets and security groups from this Estimator.
* 'Subnets' (list[str]): List of subnet ids.
* 'SecurityGroupIds' (list[str]): List of security group ids.
Returns:
dict: Model config that can be directly used by SageMakerModelOperator in Airflow. It can also be part
of the config used by SageMakerEndpointOperator and SageMakerTransformOperator in Airflow.
"""
if isinstance(estimator, sagemaker.estimator.Estimator):
model = estimator.create_model(role=role, image=image, vpc_config_override=vpc_config_override)
elif isinstance(estimator, sagemaker.amazon.amazon_estimator.AmazonAlgorithmEstimatorBase):
model = estimator.create_model(vpc_config_override=vpc_config_override)
elif isinstance(estimator, sagemaker.estimator.Framework):
model = estimator.create_model(model_server_workers=model_server_workers, role=role,
vpc_config_override=vpc_config_override)

return model_config(instance_type, model, role, image)
Loading