Skip to content

Commit 93e6e3c

Browse files
authored
[Feature] Propagate tags to lineage resources (#4027)
1 parent 80102e5 commit 93e6e3c

File tree

5 files changed

+355
-40
lines changed

5 files changed

+355
-40
lines changed

src/sagemaker/feature_store/feature_processor/feature_scheduler.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,8 @@ def to_pipeline(
129129
code for Lineage tracking. This code is not used for actual transformation.
130130
max_retries (Optional[int]): The number of times to retry sagemaker pipeline step.
131131
If not specified, sagemaker pipline step will not retry.
132-
tags (List[Tuple[str, str]): A list of tags attached to the pipeline. If not specified,
133-
no custom tags will be attached to the pipeline.
132+
tags (List[Tuple[str, str]): A list of tags attached to the pipeline and all corresponding
133+
lineage resources that support tags. If not specified, no custom tags will be attached.
134134
sagemaker_session (Optional[Session]): Session object which manages interactions
135135
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
136136
function creates one using the default AWS configuration chain.
@@ -220,6 +220,9 @@ def to_pipeline(
220220

221221
describe_pipeline_response = pipeline.describe()
222222
pipeline_arn = describe_pipeline_response["PipelineArn"]
223+
tags_propagate_to_lineage_resources = _get_tags_from_pipeline_to_propagate_to_lineage_resources(
224+
pipeline_arn, _sagemaker_session
225+
)
223226

224227
lineage_handler = FeatureProcessorLineageHandler(
225228
pipeline_name=pipeline_name,
@@ -230,7 +233,8 @@ def to_pipeline(
230233
transformation_code=transformation_code,
231234
sagemaker_session=_sagemaker_session,
232235
)
233-
lineage_handler.create_lineage()
236+
lineage_handler.create_lineage(tags_propagate_to_lineage_resources)
237+
lineage_handler.upsert_tags_for_lineage_resources(tags_propagate_to_lineage_resources)
234238

235239
pipeline_lineage_names: Dict[str, str] = lineage_handler.get_pipeline_lineage_names()
236240

@@ -299,15 +303,19 @@ def schedule(
299303
describe_pipeline_response = _sagemaker_session.sagemaker_client.describe_pipeline(
300304
PipelineName=pipeline_name
301305
)
306+
pipeline_arn = describe_pipeline_response["PipelineArn"]
307+
tags_propagate_to_lineage_resources = _get_tags_from_pipeline_to_propagate_to_lineage_resources(
308+
pipeline_arn, _sagemaker_session
309+
)
302310

303311
logger.info("Creating/Updating EventBridge Schedule for pipeline %s.", pipeline_name)
304312
event_bridge_schedule_arn = event_bridge_scheduler_helper.upsert_schedule(
305-
pipeline_name,
306-
describe_pipeline_response["PipelineArn"],
307-
schedule_expression,
308-
state,
309-
_start_date,
310-
_role_arn,
313+
schedule_name=pipeline_name,
314+
pipeline_arn=pipeline_arn,
315+
schedule_expression=schedule_expression,
316+
state=state,
317+
start_date=_start_date,
318+
role=_role_arn,
311319
)
312320
logger.info("Created/Updated EventBridge Schedule for pipeline %s.", pipeline_name)
313321
lineage_handler = FeatureProcessorLineageHandler(
@@ -322,6 +330,7 @@ def schedule(
322330
schedule_expression=schedule_expression,
323331
state=state,
324332
start_date=_start_date,
333+
tags=tags_propagate_to_lineage_resources,
325334
)
326335
return event_bridge_schedule_arn["ScheduleArn"]
327336

@@ -805,7 +814,7 @@ def _get_feature_processor_outputs(
805814
def _parse_name_from_arn(fg_uri: str) -> str:
806815
"""Parse the name from a string, if it's an ARN. Otherwise, return the string.
807816
808-
Attributes:
817+
Args:
809818
fg_uri (str): The Feature Group Name or ARN.
810819
811820
Returns:
@@ -816,3 +825,23 @@ def _parse_name_from_arn(fg_uri: str) -> str:
816825
feature_group_name = match.group(4)
817826
return feature_group_name
818827
return fg_uri
828+
829+
830+
def _get_tags_from_pipeline_to_propagate_to_lineage_resources(
831+
pipeline_arn: str, sagemaker_session: Session
832+
) -> List[Dict[str, str]]:
833+
"""Retrieve custom tags attached to sagemakre pipeline
834+
835+
Args:
836+
pipeline_arn (str): SageMaker Pipeline Arn.
837+
sagemaker_session (Session): Session object which manages interactions
838+
with Amazon SageMaker APIs and any other AWS services needed. If not specified, the
839+
function creates one using the default AWS configuration chain.
840+
841+
Returns:
842+
List[Dict[str, str]]: List of custom tags to be propagated to lineage resources.
843+
"""
844+
tags_in_pipeline = sagemaker_session.sagemaker_client.list_tags(ResourceArn=pipeline_arn)[
845+
"Tags"
846+
]
847+
return [d for d in tags_in_pipeline if d["Key"] not in TO_PIPELINE_RESERVED_TAG_KEYS]

src/sagemaker/feature_store/feature_processor/lineage/_feature_processor_lineage.py

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
import attr
1919
from botocore.exceptions import ClientError
2020

21+
from sagemaker.feature_store.feature_processor._event_bridge_scheduler_helper import (
22+
EventBridgeSchedulerHelper,
23+
)
2124
from sagemaker.feature_store.feature_processor.lineage._lineage_association_handler import (
2225
LineageAssociationHandler,
2326
)
@@ -102,7 +105,7 @@ class FeatureProcessorLineageHandler:
102105
output: str = attr.ib(default=None)
103106
transformation_code: TransformationCode = attr.ib(default=None)
104107

105-
def create_lineage(self) -> None:
108+
def create_lineage(self, tags: Optional[List[Dict[str, str]]] = None) -> None:
106109
"""Create and Update Feature Processor Lineage"""
107110
input_feature_group_contexts: List[
108111
FeatureGroupContexts
@@ -120,6 +123,8 @@ def create_lineage(self) -> None:
120123
)
121124
if transformation_code_artifact is not None:
122125
logger.info("Created Transformation Code Artifact: %s", transformation_code_artifact)
126+
if tags:
127+
transformation_code_artifact.set_tags(tags) # pylint: disable=E1101
123128
# Create the Pipeline Lineage for the first time
124129
if not self._check_if_pipeline_lineage_exists():
125130
self._create_new_pipeline_lineage(
@@ -160,6 +165,7 @@ def create_schedule_lineage(
160165
schedule_expression,
161166
state,
162167
start_date: datetime,
168+
tags: Optional[List[Dict[str, str]]] = None,
163169
) -> None:
164170
"""Class to Create and Update FeatureProcessor Lineage Entities.
165171
@@ -171,10 +177,12 @@ def create_schedule_lineage(
171177
state (str):Specifies whether the schedule is enabled or disabled. Valid values are
172178
ENABLED and DISABLED. See https://docs.aws.amazon.com/scheduler/latest/APIReference/
173179
API_CreateSchedule.html#scheduler-CreateSchedule-request-State for more details.
174-
If not specified, it will default to DISABLED.
180+
If not specified, it will default to ENABLED.
175181
start_date (Optional[datetime]): The date, in UTC, after which the schedule can begin
176182
invoking its target. Depending on the schedule’s recurrence expression, invocations
177183
might occur on, or after, the StartDate you specify.
184+
tags (Optional[List[Dict[str, str]]]): Custom tags to be attached to schedule
185+
lineage resource.
178186
"""
179187
pipeline_context: Context = self._get_pipeline_context()
180188
pipeline_version_context: Context = self._get_pipeline_version_context(
@@ -192,13 +200,55 @@ def create_schedule_lineage(
192200
pipeline_schedule=pipeline_schedule,
193201
sagemaker_session=self.sagemaker_session,
194202
)
203+
if tags:
204+
schedule_artifact.set_tags(tags)
195205

196206
LineageAssociationHandler.add_upstream_schedule_associations(
197207
schedule_artifact=schedule_artifact,
198208
pipeline_version_context_arn=pipeline_version_context.context_arn,
199209
sagemaker_session=self.sagemaker_session,
200210
)
201211

212+
def upsert_tags_for_lineage_resources(self, tags: List[Dict[str, str]]) -> None:
213+
"""Add or update tags for lineage resources using tags attached to sagemaker pipeline as
214+
215+
source of truth.
216+
217+
Args:
218+
tags (List[Dict[str, str]]): Custom tags to be attached to lineage resources.
219+
"""
220+
if not tags:
221+
return
222+
pipeline_context: Context = self._get_pipeline_context()
223+
current_pipeline_version_context: Context = self._get_pipeline_version_context(
224+
last_update_time=pipeline_context.properties[LAST_UPDATE_TIME]
225+
)
226+
input_raw_data_artifacts: List[Artifact] = self._retrieve_input_raw_data_artifacts()
227+
pipeline_context.set_tags(tags)
228+
current_pipeline_version_context.set_tags(tags)
229+
for input_raw_data_artifact in input_raw_data_artifacts:
230+
input_raw_data_artifact.set_tags(tags)
231+
232+
event_bridge_scheduler_helper = EventBridgeSchedulerHelper(
233+
self.sagemaker_session,
234+
self.sagemaker_session.boto_session.client("scheduler"),
235+
)
236+
event_bridge_schedule = event_bridge_scheduler_helper.describe_schedule(self.pipeline_name)
237+
238+
if event_bridge_schedule:
239+
schedule_artifact_summary = S3LineageEntityHandler._load_artifact_from_s3_uri(
240+
s3_uri=event_bridge_schedule["Arn"],
241+
sagemaker_session=self.sagemaker_session,
242+
)
243+
if schedule_artifact_summary is not None:
244+
pipeline_schedule_artifact: Artifact = (
245+
S3LineageEntityHandler.load_artifact_from_arn(
246+
artifact_arn=schedule_artifact_summary.artifact_arn,
247+
sagemaker_session=self.sagemaker_session,
248+
)
249+
)
250+
pipeline_schedule_artifact.set_tags(tags)
251+
202252
def _create_new_pipeline_lineage(
203253
self,
204254
input_feature_group_contexts: List[FeatureGroupContexts],

tests/unit/sagemaker/feature_store/feature_processor/lineage/test_constants.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,6 @@
313313
},
314314
}
315315

316-
317316
PIPELINE_CONTEXT: Context = Context(
318317
context_arn=f"{PIPELINE_NAME}-context-arn",
319318
context_name=f"sm-fs-fe-{PIPELINE_NAME}-{CREATION_TIME}-fep",
@@ -341,7 +340,6 @@
341340
name="test-name",
342341
)
343342

344-
345343
TRANSFORMATION_CODE_INPUT_2: TransformationCode = TransformationCode(
346344
s3_uri="s3://sagemaker-us-west-2-789975069016/transform-2023-04-28-21-50-14-616/"
347345
"transform-2023-04-28-21-50-14-616/output/model.tar.gz/2",

0 commit comments

Comments
 (0)