Skip to content

Commit 34fb6c6

Browse files
committed
feature: utilities to facilitate working with Feature Groups
1 parent c211e48 commit 34fb6c6

File tree

4 files changed

+196
-3
lines changed

4 files changed

+196
-3
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
FeatureGroup Utilities
2+
----------------------
3+
4+
.. automodule:: sagemaker.feature_group_utils
5+
:members:
6+
:undoc-members:
7+
:show-inheritance:

src/sagemaker/feature_store/feature_group.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,9 +129,15 @@ def get_query_execution(self) -> Dict[str, Any]:
129129
query_execution_id=self._current_query_execution_id
130130
)
131131

132-
def as_dataframe(self) -> DataFrame:
132+
def as_dataframe(self, **kwargs) -> DataFrame:
133133
"""Download the result of the current query and load it into a DataFrame.
134134
135+
Args:
136+
kwargs: key arguments used for the method pandas.read_csv to be able to have
137+
a better tuning on data.
138+
For more info read
139+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
140+
135141
Returns:
136142
A pandas DataFrame contains the query result.
137143
"""
@@ -152,7 +158,9 @@ def as_dataframe(self) -> DataFrame:
152158
query_execution_id=self._current_query_execution_id,
153159
filename=output_filename,
154160
)
155-
return pd.read_csv(output_filename, delimiter=",")
161+
162+
kwargs.pop("delimiter", None)
163+
return pd.read_csv(filepath_or_buffer=output_filename, delimiter=",", **kwargs)
156164

157165

158166
@attr.s

tests/integ/test_feature_store.py

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
# language governing permissions and limitations under the License.
1313
from __future__ import absolute_import
1414

15+
import datetime
1516
import json
1617
import time
17-
import datetime
1818
from contextlib import contextmanager
1919

2020
import boto3
@@ -23,6 +23,7 @@
2323
import pytest
2424
from pandas import DataFrame
2525

26+
from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe
2627
from sagemaker.feature_store.feature_definition import FractionalFeatureDefinition
2728
from sagemaker.feature_store.feature_group import FeatureGroup
2829
from sagemaker.feature_store.feature_store import FeatureStore
@@ -1055,6 +1056,82 @@ def _wait_for_feature_group_update(feature_group: FeatureGroup):
10551056
print(f"FeatureGroup {feature_group.name} successfully updated.")
10561057

10571058

1059+
def test_get_feature_group_with_role_region(
1060+
feature_store_session,
1061+
role,
1062+
feature_group_name,
1063+
offline_store_s3_uri,
1064+
pandas_data_frame,
1065+
):
1066+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
1067+
feature_group.load_feature_definitions(data_frame=pandas_data_frame)
1068+
1069+
with cleanup_feature_group(feature_group):
1070+
output = feature_group.create(
1071+
s3_uri=offline_store_s3_uri,
1072+
record_identifier_name="feature1",
1073+
event_time_feature_name="feature3",
1074+
role_arn=role,
1075+
enable_online_store=True,
1076+
)
1077+
_wait_for_feature_group_create(feature_group)
1078+
1079+
feature_group.ingest(
1080+
data_frame=pandas_data_frame, max_workers=3, max_processes=2, wait=True
1081+
)
1082+
1083+
dataset = get_feature_group_as_dataframe(
1084+
feature_group_name=feature_group_name,
1085+
region=region_name,
1086+
role=role,
1087+
event_time_feature_name="feature3",
1088+
latest_ingestion=True,
1089+
athena_bucket=f"{offline_store_s3_uri}/query",
1090+
)
1091+
1092+
assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}")
1093+
assert not dataset.empty
1094+
assert isinstance(dataset, DataFrame)
1095+
1096+
1097+
def test_get_feature_group_with_session(
1098+
feature_store_session,
1099+
role,
1100+
feature_group_name,
1101+
offline_store_s3_uri,
1102+
pandas_data_frame,
1103+
):
1104+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
1105+
feature_group.load_feature_definitions(data_frame=pandas_data_frame)
1106+
1107+
with cleanup_feature_group(feature_group):
1108+
output = feature_group.create(
1109+
s3_uri=offline_store_s3_uri,
1110+
record_identifier_name="feature1",
1111+
event_time_feature_name="feature3",
1112+
role_arn=role,
1113+
enable_online_store=True,
1114+
)
1115+
_wait_for_feature_group_create(feature_group)
1116+
1117+
feature_group.ingest(
1118+
data_frame=pandas_data_frame, max_workers=3, max_processes=2, wait=True
1119+
)
1120+
1121+
dataset = get_feature_group_as_dataframe(
1122+
feature_group_name=feature_group_name,
1123+
session=feature_store_session,
1124+
event_time_feature_name="feature3",
1125+
latest_ingestion=True,
1126+
athena_bucket=f"{offline_store_s3_uri}/query",
1127+
low_memory=False,
1128+
) # Using kwargs to pass a parameter to pandas.read_csv
1129+
1130+
assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}")
1131+
assert not dataset.empty
1132+
assert isinstance(dataset, DataFrame)
1133+
1134+
10581135
@contextmanager
10591136
def cleanup_feature_group(feature_group: FeatureGroup):
10601137
try:
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
# language governing permissions and limitations under the License.
14+
"""Test for Feature Group Utils"""
15+
from __future__ import absolute_import
16+
17+
import pandas as pd
18+
import pytest
19+
from mock import Mock
20+
21+
from sagemaker.feature_store.feature_utils import (
22+
_cast_object_to_string,
23+
prepare_fg_from_dataframe_or_file,
24+
)
25+
from sagemaker.feature_store.feature_definition import (
26+
FeatureTypeEnum,
27+
)
28+
from sagemaker.feature_store.feature_group import (
29+
FeatureGroup,
30+
)
31+
32+
33+
class PicklableMock(Mock):
34+
"""Mock class use for tests"""
35+
36+
def __reduce__(self):
37+
"""Method from class Mock"""
38+
return (Mock, ())
39+
40+
41+
@pytest.fixture
42+
def sagemaker_session_mock():
43+
"""Fixture Mock class"""
44+
return Mock()
45+
46+
47+
def test_convert_unsupported_types_to_supported(sagemaker_session_mock):
48+
feature_group = FeatureGroup(name="FailedGroup", sagemaker_session=sagemaker_session_mock)
49+
df = pd.DataFrame(
50+
{
51+
"float": pd.Series([2.0], dtype="float64"),
52+
"int": pd.Series([2], dtype="int64"),
53+
"object": pd.Series(["f1"], dtype="object"),
54+
}
55+
)
56+
# Converting object or O type to string
57+
df = _cast_object_to_string(data_frame=df)
58+
59+
feature_definitions = feature_group.load_feature_definitions(data_frame=df)
60+
types = [fd.feature_type for fd in feature_definitions]
61+
62+
assert types == [
63+
FeatureTypeEnum.FRACTIONAL,
64+
FeatureTypeEnum.INTEGRAL,
65+
FeatureTypeEnum.STRING,
66+
]
67+
68+
69+
def test_prepare_fg_from_dataframe(sagemaker_session_mock):
70+
very_long_name = "long" * 20
71+
df = pd.DataFrame(
72+
{
73+
"space feature": pd.Series([2.0], dtype="float64"),
74+
"dot.feature": pd.Series([2], dtype="int64"),
75+
very_long_name: pd.Series(["f1"], dtype="string"),
76+
}
77+
)
78+
79+
feature_group = prepare_fg_from_dataframe_or_file(
80+
dataframe_or_path=df,
81+
session=sagemaker_session_mock,
82+
feature_group_name="testFG",
83+
)
84+
85+
names = [fd.feature_name for fd in feature_group.feature_definitions]
86+
types = [fd.feature_type for fd in feature_group.feature_definitions]
87+
88+
assert names == [
89+
"space_feature",
90+
"dotfeature",
91+
very_long_name[:62],
92+
"record_id",
93+
"data_as_of_date",
94+
]
95+
assert types == [
96+
FeatureTypeEnum.FRACTIONAL,
97+
FeatureTypeEnum.INTEGRAL,
98+
FeatureTypeEnum.STRING,
99+
FeatureTypeEnum.INTEGRAL,
100+
FeatureTypeEnum.FRACTIONAL,
101+
]

0 commit comments

Comments
 (0)