39
39
)
40
40
from sagemaker .session import get_execution_role , Session
41
41
from tests .integ .timeout import timeout
42
+ from urllib .parse import urlparse
42
43
43
44
BUCKET_POLICY = {
44
45
"Version" : "2012-10-17" ,
@@ -635,8 +636,8 @@ def test_create_dataset_with_feature_group_base(
635
636
)
636
637
637
638
with timeout (minutes = 10 ) and cleanup_offline_store (
638
- base_table_name , feature_store_session
639
- ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
639
+ base , feature_store_session
640
+ ) and cleanup_offline_store (feature_group , feature_store_session ):
640
641
feature_store = FeatureStore (sagemaker_session = feature_store_session )
641
642
df , query_string = (
642
643
feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
@@ -663,7 +664,7 @@ def test_create_dataset_with_feature_group_base(
663
664
664
665
assert sorted_df .equals (expect_df )
665
666
assert (
666
- query_string
667
+ query_string . strip ()
667
668
== "WITH fg_base AS (WITH table_base AS (\n "
668
669
+ "SELECT *\n "
669
670
+ "FROM (\n "
@@ -817,8 +818,8 @@ def test_create_dataset_with_feature_group_base_with_additional_params(
817
818
)
818
819
819
820
with timeout (minutes = 10 ) and cleanup_offline_store (
820
- base_table_name , feature_store_session
821
- ) and cleanup_offline_store (feature_group_table_name , feature_store_session ):
821
+ base , feature_store_session
822
+ ) and cleanup_offline_store (feature_group , feature_store_session ):
822
823
feature_store = FeatureStore (sagemaker_session = feature_store_session )
823
824
df , query_string = (
824
825
feature_store .create_dataset (base = base , output_path = offline_store_s3_uri )
@@ -850,7 +851,7 @@ def test_create_dataset_with_feature_group_base_with_additional_params(
850
851
851
852
assert sorted_df .equals (expect_df )
852
853
assert (
853
- query_string
854
+ query_string . strip ()
854
855
== "WITH fg_base AS (WITH table_base AS (\n "
855
856
+ "SELECT *\n "
856
857
+ "FROM (\n "
@@ -1068,25 +1069,30 @@ def cleanup_feature_group(feature_group: FeatureGroup):
1068
1069
1069
1070
1070
1071
@contextmanager
1071
- def cleanup_offline_store (table_name : str , feature_store_session : Session ):
1072
+ def cleanup_offline_store (feature_group : FeatureGroup , feature_store_session : Session ):
1072
1073
try :
1073
1074
yield
1074
1075
finally :
1076
+ feature_group_metadata = feature_group .describe ()
1077
+ feature_group_name = feature_group_metadata ['FeatureGroupName' ]
1075
1078
try :
1079
+ s3_uri = feature_group_metadata ['OfflineStoreConfig' ]['S3StorageConfig' ]['ResolvedOutputS3Uri' ]
1080
+ parsed_uri = urlparse (s3_uri )
1081
+ bucket_name , prefix = parsed_uri .netloc , parsed_uri .path
1082
+ prefix = prefix .strip ('/' )
1083
+ prefix = prefix [:- 5 ] if prefix .endswith ('/data' ) else prefix
1076
1084
region_name = feature_store_session .boto_session .region_name
1077
1085
s3_client = feature_store_session .boto_session .client (
1078
1086
service_name = "s3" , region_name = region_name
1079
1087
)
1080
- account_id = feature_store_session .account_id ()
1081
- bucket_name = f"sagemaker-test-featurestore-{ region_name } -{ account_id } "
1082
1088
response = s3_client .list_objects_v2 (
1083
1089
Bucket = bucket_name ,
1084
- Prefix = f" { account_id } /sagemaker/ { region_name } /offline-store/ { table_name } /" ,
1090
+ Prefix = prefix
1085
1091
)
1086
1092
files_in_folder = response ["Contents" ]
1087
1093
files_to_delete = []
1088
1094
for f in files_in_folder :
1089
1095
files_to_delete .append ({"Key" : f ["Key" ]})
1090
1096
s3_client .delete_objects (Bucket = bucket_name , Delete = {"Objects" : files_to_delete })
1091
- except Exception :
1092
- raise RuntimeError (f"Failed to delete data under { table_name } " )
1097
+ except Exception as e :
1098
+ raise RuntimeError (f"Failed to delete data for feature_group { feature_group_name } " , e )
0 commit comments