Skip to content

feat: New methods to ingest and create Feature Groups #3707

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 30 commits into from
Sep 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1ed07be
feature: utilities to faciliate working with Feature Groups
JoseJuan98 Mar 9, 2023
cd6a48c
feature: utilities to facilitate working with Feature Groups
JoseJuan98 Mar 9, 2023
6aa67bc
fix: docstring style and linting
JoseJuan98 Mar 9, 2023
fabd4fd
doc: added more doc and examples
JoseJuan98 Mar 9, 2023
6ee4e03
fix: black check
JoseJuan98 Mar 9, 2023
5560de3
fix: integ tests
JoseJuan98 Mar 9, 2023
d936c29
fix: linting
JoseJuan98 Mar 9, 2023
5390e49
fix: boto session default endpoint url
JoseJuan98 Mar 9, 2023
b587fc5
fix: black checks
JoseJuan98 Mar 10, 2023
5532009
fix: integ test
JoseJuan98 Mar 10, 2023
b5b511f
fix: integ test
JoseJuan98 Mar 10, 2023
ffcb3c2
docs: added better docs
JoseJuan98 Mar 10, 2023
ee59c3b
docs: added better docs
JoseJuan98 Mar 10, 2023
1d10a0d
docs: added more docs
JoseJuan98 Mar 10, 2023
9eb872e
fix: docstyle
JoseJuan98 Mar 10, 2023
e26a6c9
fix: sphinx
JoseJuan98 Mar 10, 2023
ee1db55
fix: doc8 missing EOL
JoseJuan98 Mar 13, 2023
418da0e
fix: test_feature_group
JoseJuan98 Mar 14, 2023
b5669f6
feature: added more unit tests
JoseJuan98 Mar 14, 2023
b588b62
fix: black
JoseJuan98 Mar 14, 2023
a14fd65
fix: version invalid format
JoseJuan98 May 18, 2023
22d8228
fix: rm .coverage files
JoseJuan98 May 18, 2023
f12767b
fix: version invalid format
JoseJuan98 May 18, 2023
314fd16
fix: rm .coverage files
JoseJuan98 May 18, 2023
c450cfd
fix: DeprecationWarning: 'source deactivate' is deprecated. Use 'cond…
JoseJuan98 Jun 6, 2023
45e407d
fix: version invalid format
JoseJuan98 May 18, 2023
52ff5ab
fix: rm .coverage files
JoseJuan98 May 18, 2023
b6012e1
fix: version invalid format
JoseJuan98 May 18, 2023
eae39af
fix: rm .coverage files
JoseJuan98 May 18, 2023
ca543d9
fix: revert notebooks test command
JoseJuan98 Sep 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
build
src/*.egg-info
.cache
.coverage
.coverage*
sagemaker_venv*
*.egg-info
.tox
Expand Down
7 changes: 7 additions & 0 deletions doc/api/utility/featuregroup_utils.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FeatureGroup Utilities
----------------------

.. automodule:: sagemaker.feature_store.feature_utils
:members:
:undoc-members:
:show-inheritance:
11 changes: 9 additions & 2 deletions src/sagemaker/feature_store/feature_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,14 @@ def get_query_execution(self) -> Dict[str, Any]:
query_execution_id=self._current_query_execution_id
)

def as_dataframe(self) -> DataFrame:
def as_dataframe(self, **kwargs) -> DataFrame:
"""Download the result of the current query and load it into a DataFrame.

Args:
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
have a better tuning on data. For more info read:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

Returns:
A pandas DataFrame contains the query result.
"""
Expand All @@ -161,7 +166,9 @@ def as_dataframe(self) -> DataFrame:
query_execution_id=self._current_query_execution_id,
filename=output_filename,
)
return pd.read_csv(output_filename, delimiter=",")

kwargs.pop("delimiter", None)
return pd.read_csv(filepath_or_buffer=output_filename, delimiter=",", **kwargs)


@attr.s
Expand Down
341 changes: 341 additions & 0 deletions src/sagemaker/feature_store/feature_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Utilities for working with FeatureGroups and FeatureStores."""
from __future__ import absolute_import

import re
import logging

from typing import Union
from pathlib import Path

import pandas
import boto3
from pandas import DataFrame, Series, read_csv

from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.session import Session

logger = logging.getLogger(__name__)


def get_session_from_role(region: str, assume_role: str = None) -> Session:
"""Method used to get the :class:`sagemaker.session.Session` from a region and/or a role.

Description:
If invoked from a session with a role that lacks permissions, it can temporarily
assume another role to perform certain tasks.
If `assume_role` is not specified it will attempt to use the default sagemaker
execution role to get the session to use the Feature Store runtime client.

Args:
assume_role (str): (Optional) role name to be assumed
region (str): region name

Returns:
:class:`sagemaker.session.Session`
"""
boto_session = boto3.Session(region_name=region)

# It will try to assume the role specified
if assume_role:
sts = boto_session.client("sts", region_name=region)

credentials = sts.assume_role(
RoleArn=assume_role, RoleSessionName="SagemakerExecution"
).get("Credentials", {})

access_key_id = credentials.get("AccessKeyId", None)
secret_access_key = credentials.get("SecretAccessKey", None)
session_token = credentials.get("SessionToken", None)

boto_session = boto3.session.Session(
region_name=region,
aws_access_key_id=access_key_id,
aws_secret_access_key=secret_access_key,
aws_session_token=session_token,
)

sagemaker_session = Session(
boto_session=boto_session,
sagemaker_client=boto_session.client("sagemaker"),
sagemaker_runtime_client=boto_session.client("sagemaker-runtime"),
sagemaker_featurestore_runtime_client=boto_session.client(
service_name="sagemaker-featurestore-runtime"
),
)

return sagemaker_session


def get_feature_group_as_dataframe(
feature_group_name: str,
athena_bucket: str,
query: str = """SELECT * FROM "sagemaker_featurestore"."#{table}"
WHERE is_deleted=False """,
role: str = None,
region: str = None,
session=None,
event_time_feature_name: str = None,
latest_ingestion: bool = True,
verbose: bool = True,
**kwargs,
) -> DataFrame:
""":class:`sagemaker.feature_store.feature_group.FeatureGroup` as :class:`pandas.DataFrame`

Examples:
>>> from sagemaker.feature_store.feature_utils import get_feature_group_as_dataframe
>>>
>>> region = "eu-west-1"
>>> fg_data = get_feature_group_as_dataframe(feature_group_name="feature_group",
>>> athena_bucket="s3://bucket/athena_queries",
>>> region=region,
>>> event_time_feature_name="EventTimeId"
>>> )
>>>
>>> type(fg_data)
<class 'pandas.core.frame.DataFrame'>

Description:
Method to run an athena query over a
:class:`sagemaker.feature_store.feature_group.FeatureGroup` in a Feature Store
to retrieve its data. It needs the :class:`sagemaker.session.Session` linked to a role
or the region and/or role used to work with Feature Stores (it uses the module
`sagemaker.feature_store.feature_utils.get_session_from_role`
to get the session).

Args:
region (str): region of the target Feature Store
feature_group_name (str): feature store name
query (str): query to run. By default, it will take the latest ingest with data that
wasn't deleted. If latest_ingestion is False it will take all the data
in the feature group that wasn't deleted. It needs to use the keyword
"#{table}" to refer to the FeatureGroup name. e.g.:
'SELECT * FROM "sagemaker_featurestore"."#{table}"'
It must not end by ';'.
athena_bucket (str): Amazon S3 bucket for running the query
role (str): role to be assumed to extract data from feature store. If not specified
the default sagemaker execution role will be used.
session (str): :class:`sagemaker.session.Session`
of SageMaker used to work with the feature store. Optional, with
role and region parameters it will infer the session.
event_time_feature_name (str): eventTimeId feature. Mandatory only if the
latest ingestion is True.
latest_ingestion (bool): if True it will get the data only from the latest ingestion.
If False it will take whatever is specified in the query, or
if not specify it, it will get all the data that wasn't deleted.
verbose (bool): if True show messages, if False is silent.
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
have a better tuning on data. For more info read:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
Returns:
:class:`pandas.DataFrame`: dataset with the data retrieved from feature group
"""

logger.setLevel(logging.WARNING)
if verbose:
logger.setLevel(logging.INFO)

if latest_ingestion:
if event_time_feature_name is not None:
query += str(
f"AND {event_time_feature_name}=(SELECT "
f"MAX({event_time_feature_name}) FROM "
'"sagemaker_featurestore"."#{table}")'
)
else:
exc = Exception(
"Argument event_time_feature_name must be specified "
"when using latest_ingestion=True."
)
logger.exception(exc)
raise exc

query += ";"

if session is not None:
sagemaker_session = session
elif region is not None:
sagemaker_session = get_session_from_role(region=region, assume_role=role)
else:
exc = Exception("Argument Session or role and region must be specified.")
logger.exception(exc)
raise exc

msg = f"Feature Group used: {feature_group_name}"
logger.info(msg)

fg = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)

sample_query = fg.athena_query()
query_string = re.sub(r"#\{(table)\}", sample_query.table_name, query)

msg = f"Running query:\n\t{sample_query} \n\n\t-> Save on bucket {athena_bucket}\n"
logger.info(msg)

sample_query.run(query_string=query_string, output_location=athena_bucket)

sample_query.wait()

# run Athena query. The output is loaded to a Pandas dataframe.
dataset = sample_query.as_dataframe(**kwargs)

msg = f"Data shape retrieve from {feature_group_name}: {dataset.shape}"
logger.info(msg)

return dataset


def _format_column_names(data: pandas.DataFrame) -> pandas.DataFrame:
"""Formats the column names for :class:`sagemaker.feature_store.feature_group.FeatureGroup`

Description:
Module to format correctly the name of the columns of a DataFrame
to later generate the features names of a Feature Group

Args:
data (:class:`pandas.DataFrame`): dataframe used

Returns:
:class:`pandas.DataFrame`
"""
data.rename(columns=lambda x: x.replace(" ", "_").replace(".", "").lower()[:62], inplace=True)
return data


def _cast_object_to_string(data_frame: pandas.DataFrame) -> pandas.DataFrame:
"""Cast properly pandas object types to strings

Description:
Method to convert 'object' and 'O' column dtypes of a pandas.DataFrame to
a valid string type recognized by Feature Groups.

Args:
data_frame: dataframe used
Returns:
pandas.DataFrame
"""
for label in data_frame.select_dtypes(["object", "O"]).columns.tolist():
data_frame[label] = data_frame[label].astype("str").astype("string")
return data_frame


def prepare_fg_from_dataframe_or_file(
dataframe_or_path: Union[str, Path, pandas.DataFrame],
feature_group_name: str,
role: str = None,
region: str = None,
session=None,
record_id: str = "record_id",
event_id: str = "data_as_of_date",
verbose: bool = False,
**kwargs,
) -> FeatureGroup:
"""Prepares a dataframe to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`

Description:
Function to prepare a :class:`pandas.DataFrame` read from a path to a csv file or pass it
directly to create a :class:`sagemaker.feature_store.feature_group.FeatureGroup`.
The path to the file needs proper dtypes, feature names and mandatory features (record_id,
event_id).
It needs the :class:`sagemaker.session.Session` linked to a role
or the region and/or role used to work with Feature Stores (it uses the module
`sagemaker.feature_store.feature_utils.get_session_from_role`
to get the session).
If record_id or event_id are not specified it will create ones
by default with the names 'record_id' and 'data_as_of_date'.

Args:
feature_group_name (str): feature group name
dataframe_or_path (str, Path, pandas.DataFrame) : pandas.DataFrame or path to the data
verbose (bool) : True for displaying messages, False for silent method.
record_id (str, 'record_id'): (Optional) Feature identifier of the rows. If specified each
value of that feature has to be unique. If not specified or
record_id='record_id', then it will create a new feature from
the index of the pandas.DataFrame.
event_id (str) : (Optional) Feature with the time of the creation of data rows.
If not specified it will create one with the current time
called `data_as_of_date`
role (str) : role used to get the session.
region (str) : region used to get the session.
session (str): session of SageMaker used to work with the feature store
**kwargs (object): key arguments used for the method pandas.read_csv to be able to
have a better tuning on data. For more info read:
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html

Returns:
:class:`sagemaker.feature_store.feature_group.FeatureGroup`:
FG prepared with all the methods and definitions properly defined
"""

logger.setLevel(logging.WARNING)
if verbose:
logger.setLevel(logging.INFO)

if isinstance(dataframe_or_path, DataFrame):
data = dataframe_or_path
elif isinstance(dataframe_or_path, str):
kwargs.pop("filepath_or_buffer", None)
data = read_csv(filepath_or_buffer=dataframe_or_path, **kwargs)
else:
exc = Exception(
str(
f"Invalid type {type(dataframe_or_path)} for "
"argument dataframe_or_path. \nParameter must be"
" of type pandas.DataFrame or string"
)
)
logger.exception(exc)
raise exc

# Formatting cols
data = _format_column_names(data=data)
data = _cast_object_to_string(data_frame=data)

if record_id == "record_id" and record_id not in data.columns:
data[record_id] = data.index

lg_uniq = len(data[record_id].unique())
lg_id = len(data[record_id])

if lg_id != lg_uniq:
exc = Exception(
str(
f"Record identifier {record_id} have {abs(lg_id - lg_uniq)} "
"duplicated rows. \nRecord identifier must be unique"
" in each row."
)
)
logger.exception(exc)
raise exc

if event_id not in data.columns:
import time

current_time_sec = int(round(time.time()))
data[event_id] = Series([current_time_sec] * lg_id, dtype="float64")

if session is not None:
sagemaker_session = session
elif role is not None and region is not None:
sagemaker_session = get_session_from_role(region=region)
else:
exc = Exception("Argument Session or role and region must be specified.")
logger.exception(exc)
raise exc

feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)

feature_group.load_feature_definitions(data_frame=data)

return feature_group
Loading