Skip to content

Commit 27d2a88

Browse files
author
Rui Wang Napieralski
committed
fix: use itertuples to ingest pandas dataframe to FeatureStore
#2032
1 parent 0df9da6 commit 27d2a88

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

src/sagemaker/feature_store/feature_group.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,12 @@ def _ingest_single_batch(
179179
end_index (int): ending position to ingest in this batch.
180180
"""
181181
logger.info("Started ingesting index %d to %d", start_index, end_index)
182-
for _, row in data_frame[start_index:end_index].iterrows():
182+
for row in data_frame[start_index:end_index].itertuples(index=False):
183183
record = [
184-
FeatureValue(feature_name=name, value_as_string=str(value))
185-
for name, value in row.items()
184+
FeatureValue(
185+
feature_name=data_frame.columns[index], value_as_string=str(row[index])
186+
)
187+
for index in range(len(row))
186188
]
187189
sagemaker_session.put_record(
188190
feature_group_name=feature_group_name, record=[value.to_dict() for value in record]

tests/integ/test_feature_store.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ def pandas_data_frame():
105105
return df
106106

107107

108+
@pytest.fixture
109+
def pandas_data_frame_without_string():
110+
df = pd.DataFrame(
111+
{
112+
"feature1": pd.Series(np.arange(10), dtype="int64"),
113+
"feature2": pd.Series([time.time()] * 10, dtype="float64"),
114+
}
115+
)
116+
return df
117+
118+
108119
@pytest.fixture
109120
def record():
110121
return [
@@ -188,6 +199,34 @@ def test_create_feature_store(
188199
assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}")
189200

190201

202+
def test_ingest_without_string_feature(
203+
feature_store_session,
204+
role,
205+
feature_group_name,
206+
offline_store_s3_uri,
207+
pandas_data_frame_without_string,
208+
):
209+
feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=feature_store_session)
210+
feature_group.load_feature_definitions(data_frame=pandas_data_frame_without_string)
211+
212+
with cleanup_feature_group(feature_group):
213+
output = feature_group.create(
214+
s3_uri=offline_store_s3_uri,
215+
record_identifier_name="feature1",
216+
event_time_feature_name="feature2",
217+
role_arn=role,
218+
enable_online_store=True,
219+
)
220+
_wait_for_feature_group_create(feature_group)
221+
222+
ingestion_manager = feature_group.ingest(
223+
data_frame=pandas_data_frame_without_string, max_workers=3, wait=False
224+
)
225+
ingestion_manager.wait()
226+
227+
assert output["FeatureGroupArn"].endswith(f"feature-group/{feature_group_name}")
228+
229+
191230
def _wait_for_feature_group_create(feature_group: FeatureGroup):
192231
status = feature_group.describe().get("FeatureGroupStatus")
193232
while status == "Creating":

0 commit comments

Comments
 (0)