Skip to content

feature: Add support for tags in to_pipeline API for feature processor #3963

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/sagemaker/feature_store/feature_processor/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,8 @@
S3_DATA_DISTRIBUTION_TYPE = "FullyReplicated"
PIPELINE_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-context-name"
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY = "sm-fs-fe:feature-engineering-pipeline-version-context-name"
TO_PIPELINE_RESERVED_TAG_KEYS = [
FEATURE_PROCESSOR_TAG_KEY,
PIPELINE_CONTEXT_NAME_TAG_KEY,
PIPELINE_VERSION_CONTEXT_NAME_TAG_KEY,
]
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import re
from datetime import datetime
from typing import Callable, List, Optional, Dict, Sequence, Union, Any
from typing import Callable, List, Optional, Dict, Sequence, Union, Any, Tuple

import pytz
from botocore.exceptions import ClientError
Expand Down Expand Up @@ -58,6 +58,7 @@
PIPELINE_NAME_MAXIMUM_LENGTH,
RESOURCE_NOT_FOUND,
FEATURE_GROUP_ARN_REGEX_PATTERN,
TO_PIPELINE_RESERVED_TAG_KEYS,
)
from sagemaker.feature_store.feature_processor._feature_processor_config import (
FeatureProcessorConfig,
Expand Down Expand Up @@ -107,6 +108,7 @@ def to_pipeline(
role: Optional[str] = None,
transformation_code: Optional[TransformationCode] = None,
max_retries: Optional[int] = None,
tags: Optional[List[Tuple[str, str]]] = None,
sagemaker_session: Optional[Session] = None,
) -> str:
"""Creates a sagemaker pipeline that takes in a callable as a training step.
Expand All @@ -127,6 +129,8 @@ def to_pipeline(
code for Lineage tracking. This code is not used for actual transformation.
max_retries (Optional[int]): The number of times to retry sagemaker pipeline step.
If not specified, sagemaker pipline step will not retry.
tags (List[Tuple[str, str]): A list of tags attached to the pipeline. If not specified,
no custom tags will be attached to the pipeline.
sagemaker_session (Optional[Session]): Session object which manages interactions
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
function creates one using the default AWS configuration chain.
Expand All @@ -135,6 +139,8 @@ def to_pipeline(
"""

_validate_input_for_to_pipeline_api(pipeline_name, step)
if tags:
_validate_tags_for_to_pipeline_api(tags)

_sagemaker_session = sagemaker_session or Session()

Expand Down Expand Up @@ -200,12 +206,15 @@ def to_pipeline(
sagemaker_session=_sagemaker_session,
parameters=[SCHEDULED_TIME_PIPELINE_PARAMETER],
)
pipeline_tags = [dict(Key=FEATURE_PROCESSOR_TAG_KEY, Value=FEATURE_PROCESSOR_TAG_VALUE)]
if tags:
pipeline_tags.extend([dict(Key=k, Value=v) for k, v in tags])

pipeline = Pipeline(**pipeline_request_dict)
logger.info("Creating/Updating sagemaker pipeline %s", pipeline_name)
pipeline.upsert(
role_arn=_role,
tags=[dict(Key=FEATURE_PROCESSOR_TAG_KEY, Value=FEATURE_PROCESSOR_TAG_VALUE)],
tags=pipeline_tags,
)
logger.info("Created sagemaker pipeline %s", pipeline_name)

Expand Down Expand Up @@ -514,6 +523,23 @@ def _validate_input_for_to_pipeline_api(pipeline_name: str, step: Callable) -> N
)


def _validate_tags_for_to_pipeline_api(tags: List[Tuple[str, str]]) -> None:
"""Validate tags provided to to_pipeline API.

Args:
tags (List[Tuple[str, str]]): A list of tags attached to the pipeline.

Raises (ValueError): raises ValueError when any of the following scenario happen:
1. reserved tag keys are provided to API.
"""
provided_tag_keys = [tag_key_value_pair[0] for tag_key_value_pair in tags]
for reserved_tag_key in TO_PIPELINE_RESERVED_TAG_KEYS:
if reserved_tag_key in provided_tag_keys:
raise ValueError(
f"{reserved_tag_key} is a reserved tag key for to_pipeline API. Please choose another tag."
)


def _validate_lineage_resources_for_to_pipeline_api(
feature_processor_config: FeatureProcessorConfig, sagemaker_session: Session
) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,18 @@ def transform(raw_s3_data_as_df):
step=transform,
role=get_execution_role(sagemaker_session),
max_retries=2,
tags=[("integ_test_tag_key_1", "integ_test_tag_key_2")],
sagemaker_session=sagemaker_session,
)
_sagemaker_client = get_sagemaker_client(sagemaker_session=sagemaker_session)

assert pipeline_arn is not None

tags = _sagemaker_client.list_tags(ResourceArn=pipeline_arn)["Tags"]

tag_keys = [tag["Key"] for tag in tags]
assert "integ_test_tag_key_1" in tag_keys

pipeline_description = Pipeline(name=pipeline_name).describe()
assert pipeline_arn == pipeline_description["PipelineArn"]
assert get_execution_role(sagemaker_session) == pipeline_description["RoleArn"]
Expand All @@ -570,7 +577,7 @@ def transform(raw_s3_data_as_df):

status = _wait_for_pipeline_execution_to_reach_terminal_state(
pipeline_execution_arn=pipeline_execution_arn,
sagemaker_client=get_sagemaker_client(sagemaker_session=sagemaker_session),
sagemaker_client=_sagemaker_client,
)
assert status == "Succeeded"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def test_to_pipeline(
step=wrapped_func,
role=EXECUTION_ROLE_ARN,
max_retries=1,
tags=[("tag_key_1", "tag_value_1"), ("tag_key_2", "tag_value_2")],
sagemaker_session=session,
)
assert pipeline_arn == PIPELINE_ARN
Expand Down Expand Up @@ -346,7 +347,11 @@ def test_to_pipeline(
[
call(
role_arn=EXECUTION_ROLE_ARN,
tags=[dict(Key=FEATURE_PROCESSOR_TAG_KEY, Value=FEATURE_PROCESSOR_TAG_VALUE)],
tags=[
dict(Key=FEATURE_PROCESSOR_TAG_KEY, Value=FEATURE_PROCESSOR_TAG_VALUE),
dict(Key="tag_key_1", Value="tag_value_1"),
dict(Key="tag_key_2", Value="tag_value_2"),
],
),
call(
role_arn=EXECUTION_ROLE_ARN,
Expand Down Expand Up @@ -527,6 +532,66 @@ def test_to_pipeline_pipeline_name_length_limit_exceeds(
)


@patch("sagemaker.remote_function.job.Session", return_value=mock_session())
@patch(
"sagemaker.remote_function.job._JobSettings._get_default_spark_image",
return_value="some_image_uri",
)
@patch("sagemaker.remote_function.job.get_execution_role", return_value=EXECUTION_ROLE_ARN)
def test_to_pipeline_used_reserved_tags(get_execution_role, mock_spark_image, session):
session.sagemaker_config = None
session.boto_region_name = TEST_REGION
session.expand_role.return_value = EXECUTION_ROLE_ARN
spark_config = SparkConfig(submit_files=["file_a", "file_b", "file_c"])
job_settings = _JobSettings(
spark_config=spark_config,
s3_root_uri=S3_URI,
role=EXECUTION_ROLE_ARN,
include_local_workdir=True,
instance_type="ml.m5.large",
encrypt_inter_container_traffic=True,
sagemaker_session=session,
)
jobs_container_entrypoint = [
"/bin/bash",
f"/opt/ml/input/data/{RUNTIME_SCRIPTS_CHANNEL_NAME}/{ENTRYPOINT_SCRIPT_NAME}",
]
jobs_container_entrypoint.extend(["--jars", "path_a"])
jobs_container_entrypoint.extend(["--py-files", "path_b"])
jobs_container_entrypoint.extend(["--files", "path_c"])
jobs_container_entrypoint.extend([SPARK_APP_SCRIPT_PATH])
container_args = ["--s3_base_uri", f"{S3_URI}/pipeline_name"]
container_args.extend(["--region", session.boto_region_name])

mock_feature_processor_config = Mock(
mode=FeatureProcessorMode.PYSPARK, inputs=[tdh.FEATURE_PROCESSOR_INPUTS], output="some_fg"
)
mock_feature_processor_config.mode.return_value = FeatureProcessorMode.PYSPARK

wrapped_func = Mock(
Callable,
feature_processor_config=mock_feature_processor_config,
job_settings=job_settings,
wrapped_func=job_function,
)
wrapped_func.feature_processor_config.return_value = mock_feature_processor_config
wrapped_func.job_settings.return_value = job_settings
wrapped_func.wrapped_func.return_value = job_function

with pytest.raises(
ValueError,
match="sm-fs-fe:created-from is a reserved tag key for to_pipeline API. Please choose another tag.",
):
to_pipeline(
pipeline_name="pipeline_name",
step=wrapped_func,
role=EXECUTION_ROLE_ARN,
max_retries=1,
tags=[("sm-fs-fe:created-from", "random")],
sagemaker_session=session,
)


@patch(
"sagemaker.feature_store.feature_processor.feature_scheduler._validate_pipeline_lineage_resources",
return_value=None,
Expand Down