Skip to content

breaking: make instance_type optional for Airflow model configs #1627

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sagemaker/cli/compatibility/v2/ast_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
modifiers.tf_legacy_mode.TensorBoardParameterRemover(),
modifiers.deprecated_params.TensorFlowScriptModeParameterRemover(),
modifiers.tfs.TensorFlowServingConstructorRenamer(),
modifiers.airflow.ModelConfigArgModifier(),
]

IMPORT_MODIFIERS = [modifiers.tfs.TensorFlowServingImportRenamer()]
Expand Down
1 change: 1 addition & 0 deletions src/sagemaker/cli/compatibility/v2/modifiers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import absolute_import

from sagemaker.cli.compatibility.v2.modifiers import ( # noqa: F401 (imported but unused)
airflow,
deprecated_params,
framework_version,
tf_legacy_mode,
Expand Down
80 changes: 80 additions & 0 deletions src/sagemaker/cli/compatibility/v2/modifiers/airflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""A class to handle argument changes for Airflow functions."""
from __future__ import absolute_import

import ast

from sagemaker.cli.compatibility.v2.modifiers.modifier import Modifier


class ModelConfigArgModifier(Modifier):
"""A class to handle argument changes for Airflow model config functions."""

FUNCTION_NAMES = ("model_config", "model_config_from_estimator")

def node_should_be_modified(self, node):
"""Checks if the ``ast.Call`` node creates an Airflow model config and
contains positional arguments.

This looks for the following formats:

- ``model_config``
- ``airflow.model_config``
- ``workflow.airflow.model_config``
- ``sagemaker.workflow.airflow.model_config``

where ``model_config`` is either ``model_config`` or ``model_config_from_estimator``.

Args:
node (ast.Call): a node that represents a function call. For more,
see https://docs.python.org/3/library/ast.html#abstract-grammar.

Returns:
bool: If the ``ast.Call`` is either a ``model_config`` call or
a ``model_config_from_estimator`` call and has positional arguments.
"""
return self._is_model_config_call(node) and len(node.args) > 0

def _is_model_config_call(self, node):
"""Checks if the node is a ``model_config`` or ``model_config_from_estimator`` call."""
if isinstance(node.func, ast.Name):
return node.func.id in self.FUNCTION_NAMES

if not (isinstance(node.func, ast.Attribute) and node.func.attr in self.FUNCTION_NAMES):
return False

return self._is_in_module(node.func, "sagemaker.workflow.airflow".split("."))

def _is_in_module(self, node, module):
"""Checks if the node is in the module, including partial matches to the module path."""
if isinstance(node.value, ast.Name):
return node.value.id == module[-1]

if isinstance(node.value, ast.Attribute) and node.value.attr == module[-1]:
return self._is_in_module(node.value, module[:-1])

return False

def modify_node(self, node):
"""Modifies the ``ast.Call`` node's arguments.

The first argument, the instance type, is turned into a keyword arg,
leaving the second argument, the model, to be the first argument.

Args:
node (ast.Call): a node that represents either a ``model_config`` call or
a ``model_config_from_estimator`` call.
"""
instance_type = node.args.pop(0)
node.keywords.append(ast.keyword(arg="instance_type", value=instance_type))
23 changes: 11 additions & 12 deletions src/sagemaker/workflow/airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,29 +557,28 @@ def prepare_framework_container_def(model, instance_type, s3_operations):
return sagemaker.container_def(deploy_image, model.model_data, deploy_env)


def model_config(instance_type, model, role=None, image=None):
def model_config(model, instance_type=None, role=None, image=None):
"""Export Airflow model config from a SageMaker model

Args:
model (sagemaker.model.Model): The Model object from which to export the Airflow config
instance_type (str): The EC2 instance type to deploy this Model to. For
example, 'ml.p2.xlarge'
model (sagemaker.model.FrameworkModel): The SageMaker model to export
Airflow config from
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the model
image (str): An container image to use for deploying the model

Returns:
dict: Model config that can be directly used by SageMakerModelOperator
in Airflow. It can also be part of the config used by
SageMakerEndpointOperator and SageMakerTransformOperator in Airflow.
in Airflow. It can also be part of the config used by
SageMakerEndpointOperator and SageMakerTransformOperator in Airflow.
"""
s3_operations = {}
model.image = image or model.image

if isinstance(model, sagemaker.model.FrameworkModel):
container_def = prepare_framework_container_def(model, instance_type, s3_operations)
else:
container_def = model.prepare_container_def(instance_type)
container_def = model.prepare_container_def()
base_name = utils.base_name_from_image(container_def["Image"])
model.name = model.name or utils.name_from_base(base_name)

Expand All @@ -601,10 +600,10 @@ def model_config(instance_type, model, role=None, image=None):


def model_config_from_estimator(
instance_type,
estimator,
task_id,
task_type,
instance_type=None,
role=None,
image=None,
name=None,
Expand All @@ -614,8 +613,6 @@ def model_config_from_estimator(
"""Export Airflow model config from a SageMaker estimator

Args:
instance_type (str): The EC2 instance type to deploy this Model to. For
example, 'ml.p2.xlarge'
estimator (sagemaker.model.EstimatorBase): The SageMaker estimator to
export Airflow config from. It has to be an estimator associated
with a training job.
Expand All @@ -627,6 +624,8 @@ def model_config_from_estimator(
task_type (str): Whether the task is from SageMakerTrainingOperator or
SageMakerTuningOperator. Values can be 'training', 'tuning' or None
(which means training job is not from any task).
instance_type (str): The EC2 instance type to deploy this Model to. For
example, 'ml.p2.xlarge'
role (str): The ``ExecutionRoleArn`` IAM Role ARN for the model
image (str): An container image to use for deploying the model
name (str): Name of the model
Expand Down Expand Up @@ -667,7 +666,7 @@ def model_config_from_estimator(
)
model.name = name

return model_config(instance_type, model, role, image)
return model_config(model, instance_type, role, image)


def transform_config(
Expand Down Expand Up @@ -914,10 +913,10 @@ def transform_config_from_estimator(
SageMakerTransformOperator in Airflow.
"""
model_base_config = model_config_from_estimator(
instance_type=instance_type,
estimator=estimator,
task_id=task_id,
task_type=task_type,
instance_type=instance_type,
role=role,
image=image,
name=model_name,
Expand Down Expand Up @@ -997,7 +996,7 @@ def deploy_config(model, initial_instance_count, instance_type, endpoint_name=No
dict: Deploy config that can be directly used by
SageMakerEndpointOperator in Airflow.
"""
model_base_config = model_config(instance_type, model)
model_base_config = model_config(model, instance_type)

production_variant = sagemaker.production_variant(
model.name, instance_type, initial_instance_count
Expand Down
2 changes: 1 addition & 1 deletion tests/integ/test_airflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,7 @@ def _build_airflow_workflow(estimator, instance_type, inputs=None, mini_batch_si
model = estimator.create_model()
assert model is not None

model_config = sm_airflow.model_config(instance_type, model)
model_config = sm_airflow.model_config(model, instance_type)
assert model_config is not None

transform_config = sm_airflow.transform_config_from_estimator(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from __future__ import absolute_import

import pasta

from sagemaker.cli.compatibility.v2.modifiers import airflow
from tests.unit.sagemaker.cli.compatibility.v2.modifiers.ast_converter import ast_call


def test_node_should_be_modified_model_config_with_args():
model_config_calls = (
"model_config(instance_type, model)",
"airflow.model_config(instance_type, model)",
"workflow.airflow.model_config(instance_type, model)",
"sagemaker.workflow.airflow.model_config(instance_type, model)",
"model_config_from_estimator(instance_type, model)",
"airflow.model_config_from_estimator(instance_type, model)",
"workflow.airflow.model_config_from_estimator(instance_type, model)",
"sagemaker.workflow.airflow.model_config_from_estimator(instance_type, model)",
)

modifier = airflow.ModelConfigArgModifier()

for call in model_config_calls:
node = ast_call(call)
assert modifier.node_should_be_modified(node) is True


def test_node_should_be_modified_model_config_without_args():
model_config_calls = (
"model_config()",
"airflow.model_config()",
"workflow.airflow.model_config()",
"sagemaker.workflow.airflow.model_config()",
"model_config_from_estimator()",
"airflow.model_config_from_estimator()",
"workflow.airflow.model_config_from_estimator()",
"sagemaker.workflow.airflow.model_config_from_estimator()",
)

modifier = airflow.ModelConfigArgModifier()

for call in model_config_calls:
node = ast_call(call)
assert modifier.node_should_be_modified(node) is False


def test_node_should_be_modified_random_function_call():
node = ast_call("sagemaker.workflow.airflow.prepare_framework_container_def()")
modifier = airflow.ModelConfigArgModifier()
assert modifier.node_should_be_modified(node) is False


def test_modify_node():
model_config_calls = (
("model_config(instance_type, model)", "model_config(model, instance_type=instance_type)"),
(
"model_config('ml.m4.xlarge', 'my-model')",
"model_config('my-model', instance_type='ml.m4.xlarge')",
),
(
"model_config('ml.m4.xlarge', model='my-model')",
"model_config(instance_type='ml.m4.xlarge', model='my-model')",
),
(
"model_config_from_estimator(instance_type, estimator, task_id, task_type)",
"model_config_from_estimator(estimator, task_id, task_type, instance_type=instance_type)",
),
(
"model_config_from_estimator(instance_type, estimator, task_id=task_id, task_type=task_type)",
"model_config_from_estimator(estimator, instance_type=instance_type, task_id=task_id, task_type=task_type)",
),
)

modifier = airflow.ModelConfigArgModifier()

for call, expected in model_config_calls:
node = ast_call(call)
modifier.modify_node(node)
assert expected == pasta.dump(node)
12 changes: 6 additions & 6 deletions tests/unit/test_airflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ def test_byo_model_config(sagemaker_session):
sagemaker_session=sagemaker_session,
)

config = airflow.model_config(instance_type="ml.c4.xlarge", model=byo_model)
config = airflow.model_config(model=byo_model)
expected_config = {
"ModelName": "model",
"PrimaryContainer": {
Expand All @@ -926,7 +926,7 @@ def test_byo_framework_model_config(sagemaker_session):
sagemaker_session=sagemaker_session,
)

config = airflow.model_config(instance_type="ml.c4.xlarge", model=byo_model)
config = airflow.model_config(model=byo_model, instance_type="ml.c4.xlarge")
expected_config = {
"ModelName": "model",
"PrimaryContainer": {
Expand Down Expand Up @@ -971,7 +971,7 @@ def test_framework_model_config(sagemaker_session):
sagemaker_session=sagemaker_session,
)

config = airflow.model_config(instance_type="ml.c4.xlarge", model=chainer_model)
config = airflow.model_config(model=chainer_model, instance_type="ml.c4.xlarge")
expected_config = {
"ModelName": "sagemaker-chainer-%s" % TIME_STAMP,
"PrimaryContainer": {
Expand Down Expand Up @@ -1009,7 +1009,7 @@ def test_amazon_alg_model_config(sagemaker_session):
model_data="{{ model_data }}", role="{{ role }}", sagemaker_session=sagemaker_session
)

config = airflow.model_config(instance_type="ml.c4.xlarge", model=pca_model)
config = airflow.model_config(model=pca_model)
expected_config = {
"ModelName": "pca-%s" % TIME_STAMP,
"PrimaryContainer": {
Expand Down Expand Up @@ -1059,10 +1059,10 @@ def test_model_config_from_framework_estimator(ecr_prefix, sagemaker_session):
airflow.training_config(mxnet_estimator, data)

config = airflow.model_config_from_estimator(
instance_type="ml.c4.xlarge",
estimator=mxnet_estimator,
task_id="task_id",
task_type="training",
instance_type="ml.c4.xlarge",
)
expected_config = {
"ModelName": "mxnet-inference-%s" % TIME_STAMP,
Expand Down Expand Up @@ -1103,7 +1103,7 @@ def test_model_config_from_amazon_alg_estimator(sagemaker_session):
airflow.training_config(knn_estimator, record, mini_batch_size=256)

config = airflow.model_config_from_estimator(
instance_type="ml.c4.xlarge", estimator=knn_estimator, task_id="task_id", task_type="tuning"
estimator=knn_estimator, task_id="task_id", task_type="tuning"
)
expected_config = {
"ModelName": "knn-%s" % TIME_STAMP,
Expand Down