Skip to content

feature: Inference recommendation id deployment support #3631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/sagemaker/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,6 +1342,7 @@ def deploy(
volume_size=None,
model_data_download_timeout=None,
container_startup_health_check_timeout=None,
inference_recommendation_id=None,
**kwargs,
):
"""Deploy the trained model to an Amazon SageMaker endpoint.
Expand Down Expand Up @@ -1419,6 +1420,9 @@ def deploy(
inference container to pass health check by SageMaker Hosting. For more information
about health check see:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html#your-algorithms-inference-algo-ping-requests
inference_recommendation_id (str): The recommendation id which specifies the
recommendation you picked from inference recommendation job results and
would like to deploy the model and endpoint with recommended parameters.
**kwargs: Passed to invocation of ``create_model()``.
Implementations may customize ``create_model()`` to accept
``**kwargs`` to customize model creation during deploy.
Expand Down Expand Up @@ -1483,6 +1487,7 @@ def deploy(
volume_size=volume_size,
model_data_download_timeout=model_data_download_timeout,
container_startup_health_check_timeout=container_startup_health_check_timeout,
inference_recommendation_id=inference_recommendation_id,
)

def register(
Expand Down
5 changes: 5 additions & 0 deletions src/sagemaker/huggingface/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def deploy(
volume_size=None,
model_data_download_timeout=None,
container_startup_health_check_timeout=None,
inference_recommendation_id=None,
**kwargs,
):
"""Deploy this ``Model`` to an ``Endpoint`` and optionally return a ``Predictor``.
Expand Down Expand Up @@ -282,6 +283,9 @@ def deploy(
inference container to pass health check by SageMaker Hosting. For more information
about health check see:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html#your-algorithms-inference-algo-ping-requests
inference_recommendation_id (str): The recommendation id which specifies the
recommendation you picked from inference recommendation job results and
would like to deploy the model and endpoint with recommended parameters.
Raises:
ValueError: If arguments combination check failed in these circumstances:
- If no role is specified or
Expand Down Expand Up @@ -317,6 +321,7 @@ def deploy(
volume_size=volume_size,
model_data_download_timeout=model_data_download_timeout,
container_startup_health_check_timeout=container_startup_health_check_timeout,
inference_recommendation_id=inference_recommendation_id,
)

def register(
Expand Down
209 changes: 198 additions & 11 deletions src/sagemaker/inference_recommender/inference_recommender_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@
from __future__ import absolute_import

import logging
import re

from typing import List, Dict, Optional

import sagemaker

from sagemaker.parameter import CategoricalParameter

INFERENCE_RECOMMENDER_FRAMEWORK_MAPPING = {
Expand Down Expand Up @@ -101,13 +100,15 @@ def right_size(
'OMP_NUM_THREADS': CategoricalParameter(['1', '2', '3', '4'])
}]

phases (list[Phase]): Specifies the criteria for increasing load
during endpoint load tests. (default: None).
traffic_type (str): Specifies the traffic type that matches the phases. (default: None).
max_invocations (str): defines invocation limit for endpoint load tests (default: None).
model_latency_thresholds (list[ModelLatencyThreshold]): defines the response latency
thresholds for endpoint load tests (default: None).
max_tests (int): restricts how many endpoints are allowed to be
phases (list[Phase]): Shape of the traffic pattern to use in the load test
(default: None).
traffic_type (str): Specifies the traffic pattern type. Currently only supports
one type 'PHASES' (default: None).
max_invocations (str): defines the minimum invocations per minute for the endpoint
to support (default: None).
model_latency_thresholds (list[ModelLatencyThreshold]): defines the maximum response
latency for endpoints to support (default: None).
max_tests (int): restricts how many endpoints in total are allowed to be
spun up for this job (default: None).
max_parallel_tests (int): restricts how many concurrent endpoints
this job is allowed to spin up (default: None).
Expand All @@ -122,7 +123,7 @@ def right_size(
raise ValueError("right_size() is currently only supported with a registered model")

if not framework and self._framework():
framework = INFERENCE_RECOMMENDER_FRAMEWORK_MAPPING.get(self._framework, framework)
framework = INFERENCE_RECOMMENDER_FRAMEWORK_MAPPING.get(self._framework(), framework)

framework_version = self._get_framework_version()

Expand Down Expand Up @@ -176,7 +177,38 @@ def right_size(

return self

def _check_inference_recommender_args(
def _update_params(
self,
**kwargs,
):
"""Check and update params based on inference recommendation id or right size case"""
instance_type = kwargs["instance_type"]
initial_instance_count = kwargs["initial_instance_count"]
accelerator_type = kwargs["accelerator_type"]
async_inference_config = kwargs["async_inference_config"]
serverless_inference_config = kwargs["serverless_inference_config"]
inference_recommendation_id = kwargs["inference_recommendation_id"]
inference_recommender_job_results = kwargs["inference_recommender_job_results"]
if inference_recommendation_id is not None:
inference_recommendation = self._update_params_for_recommendation_id(
instance_type=instance_type,
initial_instance_count=initial_instance_count,
accelerator_type=accelerator_type,
async_inference_config=async_inference_config,
serverless_inference_config=serverless_inference_config,
inference_recommendation_id=inference_recommendation_id,
)
elif inference_recommender_job_results is not None:
inference_recommendation = self._update_params_for_right_size(
instance_type,
initial_instance_count,
accelerator_type,
serverless_inference_config,
async_inference_config,
)
return inference_recommendation or (instance_type, initial_instance_count)

def _update_params_for_right_size(
self,
instance_type=None,
initial_instance_count=None,
Expand Down Expand Up @@ -232,6 +264,161 @@ def _check_inference_recommender_args(
]
return (instance_type, initial_instance_count)

def _update_params_for_recommendation_id(
self,
instance_type,
initial_instance_count,
accelerator_type,
async_inference_config,
serverless_inference_config,
inference_recommendation_id,
):
"""Update parameters with inference recommendation results.

Args:
instance_type (str): The EC2 instance type to deploy this Model to.
For example, 'ml.p2.xlarge', or 'local' for local mode. If not using
serverless inference, then it is required to deploy a model.
initial_instance_count (int): The initial number of instances to run
in the ``Endpoint`` created from this ``Model``. If not using
serverless inference, then it need to be a number larger or equals
to 1.
accelerator_type (str): Type of Elastic Inference accelerator to
deploy this model for model loading and inference, for example,
'ml.eia1.medium'. If not specified, no Elastic Inference
accelerator will be attached to the endpoint. For more
information:
https://docs.aws.amazon.com/sagemaker/latest/dg/ei.html
async_inference_config (sagemaker.model_monitor.AsyncInferenceConfig): Specifies
configuration related to async endpoint. Use this configuration when trying
to create async endpoint and make async inference. If empty config object
passed through, will use default config to deploy async endpoint. Deploy a
real-time endpoint if it's None.
serverless_inference_config (sagemaker.serverless.ServerlessInferenceConfig):
Specifies configuration related to serverless endpoint. Use this configuration
when trying to create serverless endpoint and make serverless inference. If
empty object passed through, will use pre-defined values in
``ServerlessInferenceConfig`` class to deploy serverless endpoint. Deploy an
instance based endpoint if it's None.
inference_recommendation_id (str): The recommendation id which specifies
the recommendation you picked from inference recommendation job
results and would like to deploy the model and endpoint with
recommended parameters.
Raises:
ValueError: If arguments combination check failed in these circumstances:
- If only one of instance type or instance count specified or
- If recommendation id does not follow the required format or
- If recommendation id is not valid or
- If inference recommendation id is specified along with incompatible parameters
Returns:
(string, int): instance type and associated instance count from selected
inference recommendation id if arguments combination check passed.
"""

if instance_type is not None and initial_instance_count is not None:
LOGGER.warning(
"Both instance_type and initial_instance_count are specified,"
"overriding the recommendation result."
)
return (instance_type, initial_instance_count)

# Validate non-compatible parameters with recommendation id
if bool(instance_type) != bool(initial_instance_count):
raise ValueError(
"Please either do not specify instance_type and initial_instance_count"
"since they are in recommendation, or specify both of them if you want"
"to override the recommendation."
)
if accelerator_type is not None:
raise ValueError("accelerator_type is not compatible with inference_recommendation_id.")
if async_inference_config is not None:
raise ValueError(
"async_inference_config is not compatible with inference_recommendation_id."
)
if serverless_inference_config is not None:
raise ValueError(
"serverless_inference_config is not compatible with inference_recommendation_id."
)

# Validate recommendation id
if not re.match(r"[a-zA-Z0-9](-*[a-zA-Z0-9]){0,63}\/\w{8}$", inference_recommendation_id):
raise ValueError("Inference Recommendation id is not valid")
recommendation_job_name = inference_recommendation_id.split("/")[0]

sage_client = self.sagemaker_session.sagemaker_client
recommendation_res = sage_client.describe_inference_recommendations_job(
JobName=recommendation_job_name
)
input_config = recommendation_res["InputConfig"]

recommendation = next(
(
rec
for rec in recommendation_res["InferenceRecommendations"]
if rec["RecommendationId"] == inference_recommendation_id
),
None,
)

if not recommendation:
raise ValueError(
"inference_recommendation_id does not exist in InferenceRecommendations list"
)

model_config = recommendation["ModelConfiguration"]
envs = (
model_config["EnvironmentParameters"]
if "EnvironmentParameters" in model_config
else None
)
# Update envs
recommend_envs = {}
if envs is not None:
for env in envs:
recommend_envs[env["Key"]] = env["Value"]
self.env.update(recommend_envs)

# Update params with non-compilation recommendation results
if (
"InferenceSpecificationName" not in model_config
and "CompilationJobName" not in model_config
):

if "ModelPackageVersionArn" in input_config:
modelpkg_res = sage_client.describe_model_package(
ModelPackageName=input_config["ModelPackageVersionArn"]
)
self.model_data = modelpkg_res["InferenceSpecification"]["Containers"][0][
"ModelDataUrl"
]
self.image_uri = modelpkg_res["InferenceSpecification"]["Containers"][0]["Image"]
elif "ModelName" in input_config:
model_res = sage_client.describe_model(ModelName=input_config["ModelName"])
self.model_data = model_res["PrimaryContainer"]["ModelDataUrl"]
self.image_uri = model_res["PrimaryContainer"]["Image"]
else:
if "InferenceSpecificationName" in model_config:
modelpkg_res = sage_client.describe_model_package(
ModelPackageName=input_config["ModelPackageVersionArn"]
)
self.model_data = modelpkg_res["AdditionalInferenceSpecificationDefinition"][
"Containers"
][0]["ModelDataUrl"]
self.image_uri = modelpkg_res["AdditionalInferenceSpecificationDefinition"][
"Containers"
][0]["Image"]
elif "CompilationJobName" in model_config:
compilation_res = sage_client.describe_compilation_job(
CompilationJobName=model_config["CompilationJobName"]
)
self.model_data = compilation_res["ModelArtifacts"]["S3ModelArtifacts"]
self.image_uri = compilation_res["InferenceImage"]

instance_type = recommendation["EndpointConfiguration"]["InstanceType"]
initial_instance_count = recommendation["EndpointConfiguration"]["InitialInstanceCount"]

return (instance_type, initial_instance_count)

def _convert_to_endpoint_configurations_json(
self, hyperparameter_ranges: List[Dict[str, CategoricalParameter]]
):
Expand Down
32 changes: 20 additions & 12 deletions src/sagemaker/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,7 @@ def deploy(
volume_size=None,
model_data_download_timeout=None,
container_startup_health_check_timeout=None,
inference_recommendation_id=None,
**kwargs,
):
"""Deploy this ``Model`` to an ``Endpoint`` and optionally return a ``Predictor``.
Expand Down Expand Up @@ -1110,31 +1111,24 @@ def deploy(
inference container to pass health check by SageMaker Hosting. For more information
about health check see:
https://docs.aws.amazon.com/sagemaker/latest/dg/your-algorithms-inference-code.html#your-algorithms-inference-algo-ping-requests
inference_recommendation_id (str): The recommendation id which specifies the
recommendation you picked from inference recommendation job results and
would like to deploy the model and endpoint with recommended parameters.
Raises:
ValueError: If arguments combination check failed in these circumstances:
- If no role is specified or
- If serverless inference config is not specified and instance type and instance
count are also not specified or
- If a wrong type of object is provided as serverless inference config or async
inference config
inference config or
- If inference recommendation id is specified along with incompatible parameters
Returns:
callable[string, sagemaker.session.Session] or None: Invocation of
``self.predictor_cls`` on the created endpoint name, if ``self.predictor_cls``
is not None. Otherwise, return None.
"""
removed_kwargs("update_endpoint", kwargs)

if self.inference_recommender_job_results:
inference_recommendation = self._check_inference_recommender_args(
instance_type,
initial_instance_count,
accelerator_type,
serverless_inference_config,
async_inference_config,
)
if inference_recommendation:
instance_type, initial_instance_count = inference_recommendation

self._init_sagemaker_session_if_does_not_exist(instance_type)

tags = add_jumpstart_tags(
Expand All @@ -1144,6 +1138,20 @@ def deploy(
if self.role is None:
raise ValueError("Role can not be null for deploying a model")

if (
inference_recommendation_id is not None
or self.inference_recommender_job_results is not None
):
instance_type, initial_instance_count = self._update_params(
instance_type=instance_type,
initial_instance_count=initial_instance_count,
accelerator_type=accelerator_type,
async_inference_config=async_inference_config,
serverless_inference_config=serverless_inference_config,
inference_recommendation_id=inference_recommendation_id,
inference_recommender_job_results=self.inference_recommender_job_results,
)

is_async = async_inference_config is not None
if is_async and not isinstance(async_inference_config, AsyncInferenceConfig):
raise ValueError("async_inference_config needs to be a AsyncInferenceConfig object")
Expand Down
2 changes: 2 additions & 0 deletions src/sagemaker/tensorflow/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def deploy(
volume_size=None,
model_data_download_timeout=None,
container_startup_health_check_timeout=None,
inference_recommendation_id=None,
):
"""Deploy a Tensorflow ``Model`` to a SageMaker ``Endpoint``."""

Expand All @@ -347,6 +348,7 @@ def deploy(
model_data_download_timeout=model_data_download_timeout,
container_startup_health_check_timeout=container_startup_health_check_timeout,
update_endpoint=update_endpoint,
inference_recommendation_id=inference_recommendation_id,
)

def _eia_supported(self):
Expand Down
Loading