Skip to content

Commit 80102e5

Browse files
gwenguscGuifan Weng
andauthored
feature: support online store ttl for records (#4030)
* feature: support online store ttl for records * Added integ tests for online store ttl --------- Co-authored-by: Guifan Weng <[email protected]>
1 parent 035a8ac commit 80102e5

File tree

9 files changed

+496
-18
lines changed

9 files changed

+496
-18
lines changed

doc/amazon_sagemaker_featurestore.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ location of your offline store.
202202
    role_arn = role,
203203
    s3_uri = offline_feature_store_bucket,
204204
    enable_online_store = True,
205+
    ttl_duration = None,
205206
    online_store_kms_key_id = None,
206207
    offline_store_kms_key_id = None,
207208
    disable_glue_table_creation = False,

src/sagemaker/feature_store/feature_group.py

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
FeatureParameter,
6262
TableFormatEnum,
6363
DeletionModeEnum,
64+
TtlDuration,
65+
OnlineStoreConfigUpdate,
6466
)
6567
from sagemaker.utils import resolve_value_from_config
6668

@@ -523,6 +525,7 @@ def create(
523525
role_arn: str = None,
524526
online_store_kms_key_id: str = None,
525527
enable_online_store: bool = False,
528+
ttl_duration: TtlDuration = None,
526529
offline_store_kms_key_id: str = None,
527530
disable_glue_table_creation: bool = False,
528531
data_catalog_config: DataCatalogConfig = None,
@@ -539,6 +542,7 @@ def create(
539542
event_time_feature_name (str): name of the event time feature.
540543
role_arn (str): ARN of the role used to call CreateFeatureGroup.
541544
online_store_kms_key_id (str): KMS key ARN for online store (default: None).
545+
ttl_duration (TtlDuration): Default time to live duration for records (default: None).
542546
enable_online_store (bool): whether to enable online store or not (default: False).
543547
offline_store_kms_key_id (str): KMS key ARN for offline store (default: None).
544548
If a KMS encryption key is not specified, SageMaker encrypts all data at
@@ -592,7 +596,10 @@ def create(
592596

593597
# online store configuration
594598
if enable_online_store:
595-
online_store_config = OnlineStoreConfig(enable_online_store=enable_online_store)
599+
online_store_config = OnlineStoreConfig(
600+
enable_online_store=enable_online_store,
601+
ttl_duration=ttl_duration,
602+
)
596603
if online_store_kms_key_id is not None:
597604
online_store_config.online_store_security_config = OnlineStoreSecurityConfig(
598605
kms_key_id=online_store_kms_key_id
@@ -633,21 +640,37 @@ def describe(self, next_token: str = None) -> Dict[str, Any]:
633640
feature_group_name=self.name, next_token=next_token
634641
)
635642

636-
def update(self, feature_additions: Sequence[FeatureDefinition]) -> Dict[str, Any]:
643+
def update(
644+
self,
645+
feature_additions: Sequence[FeatureDefinition] = None,
646+
online_store_config: OnlineStoreConfigUpdate = None,
647+
) -> Dict[str, Any]:
637648
"""Update a FeatureGroup and add new features from the given feature definitions.
638649
639650
Args:
640651
feature_additions (Sequence[Dict[str, str]): list of feature definitions to be updated.
652+
online_store_config (OnlineStoreConfigUpdate): online store config to be updated.
641653
642654
Returns:
643655
Response dict from service.
644656
"""
645657

658+
if feature_additions is None:
659+
feature_additions_parameter = None
660+
else:
661+
feature_additions_parameter = [
662+
feature_addition.to_dict() for feature_addition in feature_additions
663+
]
664+
665+
if online_store_config is None:
666+
online_store_config_parameter = None
667+
else:
668+
online_store_config_parameter = online_store_config.to_dict()
669+
646670
return self.sagemaker_session.update_feature_group(
647671
feature_group_name=self.name,
648-
feature_additions=[
649-
feature_addition.to_dict() for feature_addition in feature_additions
650-
],
672+
feature_additions=feature_additions_parameter,
673+
online_store_config=online_store_config_parameter,
651674
)
652675

653676
def update_feature_metadata(
@@ -756,7 +779,9 @@ def load_feature_definitions(
756779
return self.feature_definitions
757780

758781
def get_record(
759-
self, record_identifier_value_as_string: str, feature_names: Sequence[str] = None
782+
self,
783+
record_identifier_value_as_string: str,
784+
feature_names: Sequence[str] = None,
760785
) -> Sequence[Dict[str, str]]:
761786
"""Get a single record in a FeatureGroup
762787
@@ -772,14 +797,24 @@ def get_record(
772797
feature_names=feature_names,
773798
).get("Record")
774799

775-
def put_record(self, record: Sequence[FeatureValue]):
800+
def put_record(self, record: Sequence[FeatureValue], ttl_duration: TtlDuration = None):
776801
"""Put a single record in the FeatureGroup.
777802
778803
Args:
779804
record (Sequence[FeatureValue]): a list contains feature values.
805+
ttl_duration (TtlDuration): customer specified ttl duration.
780806
"""
807+
808+
if ttl_duration is not None:
809+
return self.sagemaker_session.put_record(
810+
feature_group_name=self.name,
811+
record=[value.to_dict() for value in record],
812+
ttl_duration=ttl_duration.to_dict(),
813+
)
814+
781815
return self.sagemaker_session.put_record(
782-
feature_group_name=self.name, record=[value.to_dict() for value in record]
816+
feature_group_name=self.name,
817+
record=[value.to_dict() for value in record],
783818
)
784819

785820
def delete_record(

src/sagemaker/feature_store/feature_store.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,27 @@ def list_feature_groups(
137137
next_token=next_token,
138138
)
139139

140-
def batch_get_record(self, identifiers: Sequence[Identifier]) -> Dict[str, Any]:
140+
def batch_get_record(
141+
self,
142+
identifiers: Sequence[Identifier],
143+
expiration_time_response: str = None,
144+
) -> Dict[str, Any]:
141145
"""Get record in batch from FeatureStore
142146
143147
Args:
144148
identifiers (Sequence[Identifier]): A list of identifiers to uniquely identify records
145149
in FeatureStore.
150+
expiration_time_response (str): the field of expiration time response
151+
to toggle returning of expiresAt.
146152
147153
Returns:
148154
Response dict from service.
149155
"""
150156
batch_get_record_identifiers = [identifier.to_dict() for identifier in identifiers]
151-
return self.sagemaker_session.batch_get_record(identifiers=batch_get_record_identifiers)
157+
return self.sagemaker_session.batch_get_record(
158+
identifiers=batch_get_record_identifiers,
159+
expiration_time_response=expiration_time_response,
160+
)
152161

153162
def search(
154163
self,

src/sagemaker/feature_store/inputs.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,17 +84,43 @@ def to_dict(self) -> Dict[str, Any]:
8484
return Config.construct_dict(KmsKeyId=self.kms_key_id)
8585

8686

87+
@attr.s
88+
class TtlDuration(Config):
89+
"""TtlDuration for records in online FeatureStore.
90+
91+
Attributes:
92+
unit (str): time unit.
93+
value (int): time value.
94+
"""
95+
96+
unit: str = attr.ib()
97+
value: int = attr.ib()
98+
99+
def to_dict(self) -> Dict[str, Any]:
100+
"""Construct a dictionary based on the attributes.
101+
102+
Returns:
103+
dict represents the attributes.
104+
"""
105+
return Config.construct_dict(
106+
Unit=self.unit,
107+
Value=self.value,
108+
)
109+
110+
87111
@attr.s
88112
class OnlineStoreConfig(Config):
89113
"""OnlineStoreConfig for FeatureStore.
90114
91115
Attributes:
92116
enable_online_store (bool): whether to enable the online store.
93117
online_store_security_config (OnlineStoreSecurityConfig): configuration of security setting.
118+
ttl_duration (TtlDuration): Default time to live duration for records.
94119
"""
95120

96121
enable_online_store: bool = attr.ib(default=True)
97122
online_store_security_config: OnlineStoreSecurityConfig = attr.ib(default=None)
123+
ttl_duration: TtlDuration = attr.ib(default=None)
98124

99125
def to_dict(self) -> Dict[str, Any]:
100126
"""Construct a dictionary based on the attributes.
@@ -105,6 +131,28 @@ def to_dict(self) -> Dict[str, Any]:
105131
return Config.construct_dict(
106132
EnableOnlineStore=self.enable_online_store,
107133
SecurityConfig=self.online_store_security_config,
134+
TtlDuration=self.ttl_duration,
135+
)
136+
137+
138+
@attr.s
139+
class OnlineStoreConfigUpdate(Config):
140+
"""OnlineStoreConfigUpdate for FeatureStore.
141+
142+
Attributes:
143+
ttl_duration (TtlDuration): Default time to live duration for records.
144+
"""
145+
146+
ttl_duration: TtlDuration = attr.ib(default=None)
147+
148+
def to_dict(self) -> Dict[str, Any]:
149+
"""Construct a dictionary based on the attributes.
150+
151+
Returns:
152+
dict represents the attributes.
153+
"""
154+
return Config.construct_dict(
155+
TtlDuration=self.ttl_duration,
108156
)
109157

110158

@@ -379,3 +427,13 @@ class DeletionModeEnum(Enum):
379427

380428
SOFT_DELETE = "SoftDelete"
381429
HARD_DELETE = "HardDelete"
430+
431+
432+
class ExpirationTimeResponseEnum(Enum):
433+
"""Enum of toggling the response of ExpiresAt.
434+
435+
The ExpirationTimeResponse for toggling the response of ExpiresAt can be Disabled or Enabled.
436+
"""
437+
438+
DISABLED = "Disabled"
439+
ENABLED = "Enabled"

src/sagemaker/session.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2639,7 +2639,7 @@ def tune( # noqa: C901
26392639
warm_start_config (dict): Configuration defining the type of warm start and
26402640
other required configurations.
26412641
max_runtime_in_seconds (int or PipelineVariable): The maximum time in seconds
2642-
that a hyperparameter tuning job can run.
2642+
that a training job launched by a hyperparameter tuning job can run.
26432643
completion_criteria_config (sagemaker.tuner.TuningJobCompletionCriteriaConfig): A
26442644
configuration for the completion criteria.
26452645
early_stopping_type (str): Specifies whether early stopping is enabled for the job.
@@ -2894,7 +2894,7 @@ def _map_tuning_config(
28942894
tuning job.
28952895
max_parallel_jobs (int): Maximum number of parallel training jobs to start.
28962896
max_runtime_in_seconds (int or PipelineVariable): The maximum time in seconds
2897-
that a hyperparameter tuning job can run.
2897+
that a training job launched by a hyperparameter tuning job can run.
28982898
early_stopping_type (str): Specifies whether early stopping is enabled for the job.
28992899
Can be either 'Auto' or 'Off'. If set to 'Off', early stopping will not be
29002900
attempted. If set to 'Auto', early stopping of some training jobs may happen,
@@ -5097,9 +5097,15 @@ def describe_feature_group(
50975097
return self.sagemaker_client.describe_feature_group(**kwargs)
50985098

50995099
def update_feature_group(
5100-
self, feature_group_name: str, feature_additions: Sequence[Dict[str, str]]
5100+
self,
5101+
feature_group_name: str,
5102+
feature_additions: Sequence[Dict[str, str]] = None,
5103+
online_store_config: Dict[str, any] = None,
51015104
) -> Dict[str, Any]:
5102-
"""Update a FeatureGroup and add new features from the given feature definitions.
5105+
"""Update a FeatureGroup
5106+
5107+
either adding new features from the given feature definitions
5108+
or updating online store config
51035109
51045110
Args:
51055111
feature_group_name (str): name of the FeatureGroup to update.
@@ -5108,6 +5114,12 @@ def update_feature_group(
51085114
Response dict from service.
51095115
"""
51105116

5117+
if feature_additions is None:
5118+
return self.sagemaker_client.update_feature_group(
5119+
FeatureGroupName=feature_group_name,
5120+
OnlineStoreConfig=online_store_config,
5121+
)
5122+
51115123
return self.sagemaker_client.update_feature_group(
51125124
FeatureGroupName=feature_group_name, FeatureAdditions=feature_additions
51135125
)
@@ -5258,6 +5270,7 @@ def put_record(
52585270
self,
52595271
feature_group_name: str,
52605272
record: Sequence[Dict[str, str]],
5273+
ttl_duration: Dict[str, str] = None,
52615274
):
52625275
"""Puts a single record in the FeatureGroup.
52635276
@@ -5266,6 +5279,14 @@ def put_record(
52665279
record (Sequence[Dict[str, str]]): list of FeatureValue dicts to be ingested
52675280
into FeatureStore.
52685281
"""
5282+
5283+
if ttl_duration:
5284+
return self.sagemaker_featurestore_runtime_client.put_record(
5285+
FeatureGroupName=feature_group_name,
5286+
Record=record,
5287+
TtlDuration=ttl_duration,
5288+
)
5289+
52695290
return self.sagemaker_featurestore_runtime_client.put_record(
52705291
FeatureGroupName=feature_group_name,
52715292
Record=record,
@@ -5298,36 +5319,51 @@ def get_record(
52985319
record_identifier_value_as_string: str,
52995320
feature_group_name: str,
53005321
feature_names: Sequence[str],
5322+
expiration_time_response: str = None,
53015323
) -> Dict[str, Sequence[Dict[str, str]]]:
53025324
"""Gets a single record in the FeatureGroup.
53035325
53045326
Args:
53055327
record_identifier_value_as_string (str): name of the record identifier.
53065328
feature_group_name (str): name of the FeatureGroup.
53075329
feature_names (Sequence[str]): list of feature names.
5330+
expiration_time_response (str): the field of expiration time response
5331+
to toggle returning of expiresAt.
53085332
"""
53095333
get_record_args = {
53105334
"FeatureGroupName": feature_group_name,
53115335
"RecordIdentifierValueAsString": record_identifier_value_as_string,
53125336
}
53135337

5338+
if expiration_time_response:
5339+
get_record_args["ExpirationTimeResponse"] = expiration_time_response
5340+
53145341
if feature_names:
53155342
get_record_args["FeatureNames"] = feature_names
53165343

53175344
return self.sagemaker_featurestore_runtime_client.get_record(**get_record_args)
53185345

5319-
def batch_get_record(self, identifiers: Sequence[Dict[str, Any]]) -> Dict[str, Any]:
5346+
def batch_get_record(
5347+
self,
5348+
identifiers: Sequence[Dict[str, Any]],
5349+
expiration_time_response: str = None,
5350+
) -> Dict[str, Any]:
53205351
"""Gets a batch of record from FeatureStore.
53215352
53225353
Args:
53235354
identifiers (Sequence[Dict[str, Any]]): list of identifiers to uniquely identify records
53245355
in FeatureStore.
5356+
expiration_time_response (str): the field of expiration time response
5357+
to toggle returning of expiresAt.
53255358
53265359
Returns:
53275360
Response dict from service.
53285361
"""
53295362
batch_get_record_args = {"Identifiers": identifiers}
53305363

5364+
if expiration_time_response:
5365+
batch_get_record_args["ExpirationTimeResponse"] = expiration_time_response
5366+
53315367
return self.sagemaker_featurestore_runtime_client.batch_get_record(**batch_get_record_args)
53325368

53335369
def start_query_execution(

0 commit comments

Comments
 (0)