Skip to content

feature: Added transform with monitoring pipeline step in transformer #3438

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 155 additions & 3 deletions src/sagemaker/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@
from __future__ import absolute_import

from typing import Union, Optional, List, Dict
from botocore import exceptions
import logging
import copy
import time

from botocore import exceptions
from sagemaker.job import _Job
from sagemaker.session import Session
from sagemaker.session import Session, get_execution_role
from sagemaker.inputs import BatchDataCaptureConfig
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.functions import Join
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.workflow.pipeline_context import runnable_by_pipeline, PipelineSession
from sagemaker.workflow import is_pipeline_variable
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.utils import base_name_from_image, name_from_base
Expand Down Expand Up @@ -266,6 +269,155 @@ def transform(
if wait:
self.latest_transform_job.wait(logs=logs)

def transform_with_monitoring(
self,
monitoring_config,
monitoring_resource_config,
data: str,
data_type: str = "S3Prefix",
content_type: str = None,
compression_type: str = None,
split_type: str = None,
input_filter: str = None,
output_filter: str = None,
join_source: str = None,
model_client_config: Dict[str, str] = None,
batch_data_capture_config: BatchDataCaptureConfig = None,
monitor_before_transform: bool = False,
supplied_baseline_statistics: str = None,
supplied_baseline_constraints: str = None,
wait: bool = True,
pipeline_name: str = None,
role: str = None,
):
"""Runs a transform job with monitoring job.

Note that this function will not start a transform job immediately,
instead, it will create a SageMaker Pipeline and execute it.
If you provide an existing pipeline_name, no new pipeline will be created, otherwise,
each transform_with_monitoring call will create a new pipeline and execute.

Args:
monitoring_config (Union[
`sagemaker.workflow.quality_check_step.QualityCheckConfig`,
`sagemaker.workflow.quality_check_step.ClarifyCheckConfig`
]): the monitoring configuration used for run model monitoring.
monitoring_resource_config (`sagemaker.workflow.check_job_config.CheckJobConfig`):
the check job (processing job) cluster resource configuration.
transform_step_args (_JobStepArguments): the transform step transform arguments.
data (str): Input data location in S3 for the transform job
data_type (str): What the S3 location defines (default: 'S3Prefix').
Valid values:
* 'S3Prefix' - the S3 URI defines a key name prefix. All objects with this prefix
will be used as inputs for the transform job.
* 'ManifestFile' - the S3 URI points to a single manifest file listing each S3
object to use as an input for the transform job.
content_type (str): MIME type of the input data (default: None).
compression_type (str): Compression type of the input data, if
compressed (default: None). Valid values: 'Gzip', None.
split_type (str): The record delimiter for the input object
(default: 'None'). Valid values: 'None', 'Line', 'RecordIO', and
'TFRecord'.
input_filter (str): A JSONPath to select a portion of the input to
pass to the algorithm container for inference. If you omit the
field, it gets the value '$', representing the entire input.
For CSV data, each row is taken as a JSON array,
so only index-based JSONPaths can be applied, e.g. $[0], $[1:].
CSV data should follow the `RFC format <https://tools.ietf.org/html/rfc4180>`_.
See `Supported JSONPath Operators
<https://docs.aws.amazon.com/sagemaker/latest/dg/batch-transform-data-processing.html#data-processing-operators>`_
for a table of supported JSONPath operators.
For more information, see the SageMaker API documentation for
`CreateTransformJob
<https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html>`_.
Some examples: "$[1:]", "$.features" (default: None).
output_filter (str): A JSONPath to select a portion of the
joined/original output to return as the output.
For more information, see the SageMaker API documentation for
`CreateTransformJob
<https://docs.aws.amazon.com/sagemaker/latest/dg/API_CreateTransformJob.html>`_.
Some examples: "$[1:]", "$.prediction" (default: None).
join_source (str): The source of data to be joined to the transform
output. It can be set to 'Input' meaning the entire input record
will be joined to the inference result. You can use OutputFilter
to select the useful portion before uploading to S3. (default:
None). Valid values: Input, None.
model_client_config (dict[str, str]): Model configuration.
Dictionary contains two optional keys,
'InvocationsTimeoutInSeconds', and 'InvocationsMaxRetries'.
(default: ``None``).
batch_data_capture_config (BatchDataCaptureConfig): Configuration object which
specifies the configurations related to the batch data capture for the transform job
(default: ``None``).
monitor_before_transform (bgool): If to run data quality
or model explainability monitoring type,
a true value of this flag indicates running the check step before the transform job.
fail_on_violation (Union[bool, PipelineVariable]): A opt-out flag to not to fail the
check step when a violation is detected.
supplied_baseline_statistics (Union[str, PipelineVariable]): The S3 path
to the supplied statistics object representing the statistics JSON file
which will be used for drift to check (default: None).
supplied_baseline_constraints (Union[str, PipelineVariable]): The S3 path
to the supplied constraints object representing the constraints JSON file
which will be used for drift to check (default: None).
wait (bool): To determine if needed to wait for the pipeline execution to complete
pipeline_name (str): The name of the Pipeline for the monitoring and transfrom step
role (str): Execution role
"""

transformer = self
if not isinstance(self.sagemaker_session, PipelineSession):
sagemaker_session = self.sagemaker_session
self.sagemaker_session = None
transformer = copy.deepcopy(self)
transformer.sagemaker_session = PipelineSession()
self.sagemaker_session = sagemaker_session

transform_step_args = transformer.transform(
data=data,
data_type=data_type,
content_type=content_type,
compression_type=compression_type,
split_type=split_type,
input_filter=input_filter,
output_filter=output_filter,
batch_data_capture_config=batch_data_capture_config,
join_source=join_source,
model_client_config=model_client_config,
)

from sagemaker.workflow.monitor_batch_transform_step import MonitorBatchTransformStep

monitoring_batch_step = MonitorBatchTransformStep(
name="MonitorBatchTransformStep",
display_name="MonitorBatchTransformStep",
description="",
transform_step_args=transform_step_args,
monitor_configuration=monitoring_config,
check_job_configuration=monitoring_resource_config,
monitor_before_transform=monitor_before_transform,
supplied_baseline_constraints=supplied_baseline_constraints,
supplied_baseline_statistics=supplied_baseline_statistics,
)

pipeline_name = (
pipeline_name if pipeline_name else f"TransformWithMonitoring{int(time.time())}"
)
# if pipeline exists, just start the execution
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
name=pipeline_name,
steps=[monitoring_batch_step],
sagemaker_session=transformer.sagemaker_session,
)
pipeline.upsert(role_arn=role if role else get_execution_role())
execution = pipeline.start()
if wait:
logging.info("Waiting for transform with monitoring to execute ...")
execution.wait()
return execution

def delete_model(self):
"""Delete the corresponding SageMaker model for this Transformer."""
self.sagemaker_session.delete_model(self.model_name)
Expand Down
66 changes: 65 additions & 1 deletion tests/integ/test_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from sagemaker.transformer import Transformer
from sagemaker.estimator import Estimator
from sagemaker.inputs import BatchDataCaptureConfig
from sagemaker.xgboost import XGBoostModel
from sagemaker.utils import unique_name_from_base
from tests.integ import (
datasets,
Expand All @@ -36,7 +37,7 @@
from tests.integ.timeout import timeout, timeout_and_delete_model_with_transformer
from tests.integ.vpc_test_utils import get_or_create_vpc_resources

from sagemaker.model_monitor import DatasetFormat, Statistics
from sagemaker.model_monitor import DatasetFormat, Statistics, Constraints

from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import (
Expand Down Expand Up @@ -645,3 +646,66 @@ def _create_transformer_and_transform_job(
job_name=unique_name_from_base("test-transform"),
)
return transformer


def test_transformer_and_monitoring_job(
pipeline_session,
sagemaker_session,
role,
pipeline_name,
check_job_config,
data_bias_check_config,
):
xgb_model_data_s3 = pipeline_session.upload_data(
path=os.path.join(os.path.join(DATA_DIR, "xgboost_abalone"), "xgb_model.tar.gz"),
key_prefix="integ-test-data/xgboost/model",
)
data_bias_supplied_baseline_constraints = Constraints.from_file_path(
constraints_file_path=os.path.join(
DATA_DIR, "pipeline/clarify_check_step/data_bias/good_cases/analysis.json"
),
sagemaker_session=sagemaker_session,
).file_s3_uri

xgb_model = XGBoostModel(
model_data=xgb_model_data_s3,
framework_version="1.3-1",
role=role,
sagemaker_session=sagemaker_session,
entry_point=os.path.join(os.path.join(DATA_DIR, "xgboost_abalone"), "inference.py"),
enable_network_isolation=True,
)

xgb_model.deploy(_INSTANCE_COUNT, _INSTANCE_TYPE)

transform_output = f"s3://{sagemaker_session.default_bucket()}/{pipeline_name}Transform"
transformer = Transformer(
model_name=xgb_model.name,
strategy="SingleRecord",
instance_type="ml.m5.xlarge",
instance_count=1,
output_path=transform_output,
sagemaker_session=pipeline_session,
)

transform_input = pipeline_session.upload_data(
path=os.path.join(DATA_DIR, "xgboost_abalone", "abalone"),
key_prefix="integ-test-data/xgboost_abalone/abalone",
)

execution = transformer.transform_with_monitoring(
monitoring_config=data_bias_check_config,
monitoring_resource_config=check_job_config,
data=transform_input,
content_type="text/libsvm",
supplied_baseline_constraints=data_bias_supplied_baseline_constraints,
role=role,
)

execution_steps = execution.list_steps()
assert len(execution_steps) == 2

for execution_step in execution_steps:
assert execution_step["StepStatus"] == "Succeeded"

xgb_model.delete_model()