@@ -43,34 +43,6 @@ class TableType(Enum):
43
43
DATA_FRAME = "DataFrame"
44
44
45
45
46
- @attr .s
47
- class JoinTypeEnum (Enum ):
48
- """Enum of Join types.
49
-
50
- The Join comparator can be "INNER_JOIN", "LEFT_JOIN", "RIGHT_JOIN", "FULL_JOIN"
51
- """
52
-
53
- INNER_JOIN = "JOIN"
54
- LEFT_JOIN = "LEFT JOIN"
55
- RIGHT_JOIN = "RIGHT JOIN"
56
- FULL_JOIN = "FULL JOIN"
57
-
58
-
59
- @attr .s
60
- class JoinComparatorEnum (Enum ):
61
- """Enum of Join comparators.
62
-
63
- The Join comparator can be "EQUALS", "GREATER_THAN", "LESS_THAN",
64
- "GREATER_THAN_OR_EQUAL_TO", or "LESS_THAN_OR_EQUAL_TO"
65
- """
66
-
67
- EQUALS = "="
68
- GREATER_THAN = ">"
69
- GREATER_THAN_OR_EQUAL_TO = ">="
70
- LESS_THAN = "<"
71
- LESS_THAN_OR_EQUAL_TO = "<="
72
-
73
-
74
46
@attr .s
75
47
class FeatureGroupToBeMerged :
76
48
"""FeatureGroup metadata which will be used for SQL join.
@@ -83,28 +55,19 @@ class FeatureGroupToBeMerged:
83
55
Attributes:
84
56
features (List[str]): A list of strings representing feature names of this FeatureGroup.
85
57
included_feature_names (List[str]): A list of strings representing features to be
86
- included in the SQL join.
58
+ included in the sql join.
87
59
projected_feature_names (List[str]): A list of strings representing features to be
88
60
included for final projection in output.
89
61
catalog (str): A string representing the catalog.
90
62
database (str): A string representing the database.
91
63
table_name (str): A string representing the Athena table name of this FeatureGroup.
92
- record_identifier_feature_name (str): A string representing the record identifier feature.
64
+ record_dentifier_feature_name (str): A string representing the record identifier feature.
93
65
event_time_identifier_feature (FeatureDefinition): A FeatureDefinition representing the
94
66
event time identifier feature.
95
67
target_feature_name_in_base (str): A string representing the feature name in base which will
96
68
be used as target join key (default: None).
97
69
table_type (TableType): A TableType representing the type of table if it is Feature Group or
98
70
Panda Data Frame (default: None).
99
- feature_name_in_target (str): A string representing the feature name in the target feature
100
- group that will be compared to the target feature in the base feature group.
101
- If None is provided, the record identifier feature will be used in the
102
- SQL join. (default: None).
103
- join_comparator (JoinComparatorEnum): A JoinComparatorEnum representing the comparator
104
- used when joining the target feature in the base feature group and the feature
105
- in the target feature group. (default: JoinComparatorEnum.EQUALS).
106
- join_type (JoinTypeEnum): A JoinTypeEnum representing the type of join between
107
- the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).
108
71
"""
109
72
110
73
features : List [str ] = attr .ib ()
@@ -117,18 +80,12 @@ class FeatureGroupToBeMerged:
117
80
event_time_identifier_feature : FeatureDefinition = attr .ib ()
118
81
target_feature_name_in_base : str = attr .ib (default = None )
119
82
table_type : TableType = attr .ib (default = None )
120
- feature_name_in_target : str = attr .ib (default = None )
121
- join_comparator : JoinComparatorEnum = attr .ib (default = JoinComparatorEnum .EQUALS )
122
- join_type : JoinTypeEnum = attr .ib (default = JoinTypeEnum .INNER_JOIN )
123
83
124
84
125
85
def construct_feature_group_to_be_merged (
126
- target_feature_group : FeatureGroup ,
86
+ feature_group : FeatureGroup ,
127
87
included_feature_names : List [str ],
128
88
target_feature_name_in_base : str = None ,
129
- feature_name_in_target : str = None ,
130
- join_comparator : JoinComparatorEnum = JoinComparatorEnum .EQUALS ,
131
- join_type : JoinTypeEnum = JoinTypeEnum .INNER_JOIN ,
132
89
) -> FeatureGroupToBeMerged :
133
90
"""Construct a FeatureGroupToBeMerged object by provided parameters.
134
91
@@ -138,29 +95,18 @@ def construct_feature_group_to_be_merged(
138
95
included in the output.
139
96
target_feature_name_in_base (str): A string representing the feature name in base which
140
97
will be used as target join key (default: None).
141
- feature_name_in_target (str): A string representing the feature name in the target feature
142
- group that will be compared to the target feature in the base feature group.
143
- If None is provided, the record identifier feature will be used in the
144
- SQL join. (default: None).
145
- join_comparator (JoinComparatorEnum): A JoinComparatorEnum representing the comparator
146
- used when joining the target feature in the base feature group and the feature
147
- in the target feature group. (default: JoinComparatorEnum.EQUALS).
148
- join_type (JoinTypeEnum): A JoinTypeEnum representing the type of join between
149
- the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).
150
98
Returns:
151
99
A FeatureGroupToBeMerged object.
152
100
153
101
Raises:
154
102
ValueError: Invalid feature name(s) in included_feature_names.
155
103
"""
156
- feature_group_metadata = target_feature_group .describe ()
104
+ feature_group_metadata = feature_group .describe ()
157
105
data_catalog_config = feature_group_metadata .get ("OfflineStoreConfig" , {}).get (
158
106
"DataCatalogConfig" , None
159
107
)
160
108
if not data_catalog_config :
161
- raise RuntimeError (
162
- f"No metastore is configured with FeatureGroup { target_feature_group .name } ."
163
- )
109
+ raise RuntimeError (f"No metastore is configured with FeatureGroup { feature_group .name } ." )
164
110
165
111
record_identifier_feature_name = feature_group_metadata .get ("RecordIdentifierFeatureName" , None )
166
112
feature_definitions = feature_group_metadata .get ("FeatureDefinitions" , [])
@@ -180,15 +126,10 @@ def construct_feature_group_to_be_merged(
180
126
catalog = data_catalog_config .get ("Catalog" , None ) if disable_glue else _DEFAULT_CATALOG
181
127
features = [feature .get ("FeatureName" , None ) for feature in feature_definitions ]
182
128
183
- if feature_name_in_target is not None and feature_name_in_target not in features :
184
- raise ValueError (
185
- f"Feature { feature_name_in_target } not found in FeatureGroup { target_feature_group .name } "
186
- )
187
-
188
129
for included_feature in included_feature_names or []:
189
130
if included_feature not in features :
190
131
raise ValueError (
191
- f"Feature { included_feature } not found in FeatureGroup { target_feature_group .name } "
132
+ f"Feature { included_feature } not found in FeatureGroup { feature_group .name } "
192
133
)
193
134
if not included_feature_names :
194
135
included_feature_names = features
@@ -210,9 +151,6 @@ def construct_feature_group_to_be_merged(
210
151
FeatureDefinition (event_time_identifier_feature_name , event_time_identifier_feature_type ),
211
152
target_feature_name_in_base ,
212
153
TableType .FEATURE_GROUP ,
213
- feature_name_in_target ,
214
- join_comparator ,
215
- join_type ,
216
154
)
217
155
218
156
@@ -289,38 +227,21 @@ def with_feature_group(
289
227
feature_group : FeatureGroup ,
290
228
target_feature_name_in_base : str = None ,
291
229
included_feature_names : List [str ] = None ,
292
- feature_name_in_target : str = None ,
293
- join_comparator : JoinComparatorEnum = JoinComparatorEnum .EQUALS ,
294
- join_type : JoinTypeEnum = JoinTypeEnum .INNER_JOIN ,
295
230
):
296
231
"""Join FeatureGroup with base.
297
232
298
233
Args:
299
- feature_group (FeatureGroup): A target FeatureGroup which will be joined to base.
234
+ feature_group (FeatureGroup): A FeatureGroup which will be joined to base.
300
235
target_feature_name_in_base (str): A string representing the feature name in base which
301
- will be used as a join key (default: None).
236
+ will be used as target join key (default: None).
302
237
included_feature_names (List[str]): A list of strings representing features to be
303
238
included in the output (default: None).
304
- feature_name_in_target (str): A string representing the feature name in the target
305
- feature group that will be compared to the target feature in the base feature group.
306
- If None is provided, the record identifier feature will be used in the
307
- SQL join. (default: None).
308
- join_comparator (JoinComparatorEnum): A JoinComparatorEnum representing the comparator
309
- used when joining the target feature in the base feature group and the feature
310
- in the target feature group. (default: JoinComparatorEnum.EQUALS).
311
- join_type (JoinTypeEnum): A JoinTypeEnum representing the type of join between
312
- the base and target feature groups. (default: JoinTypeEnum.INNER_JOIN).
313
- Returns:
314
- This DatasetBuilder object.
239
+ Returns:
240
+ This DatasetBuilder object.
315
241
"""
316
242
self ._feature_groups_to_be_merged .append (
317
243
construct_feature_group_to_be_merged (
318
- feature_group ,
319
- included_feature_names ,
320
- target_feature_name_in_base ,
321
- feature_name_in_target ,
322
- join_comparator ,
323
- join_type ,
244
+ feature_group , included_feature_names , target_feature_name_in_base
324
245
)
325
246
)
326
247
return self
@@ -984,18 +905,10 @@ def _construct_join_condition(self, feature_group: FeatureGroupToBeMerged, suffi
984
905
Returns:
985
906
The JOIN query string.
986
907
"""
987
-
988
- feature_name_in_target = (
989
- feature_group .feature_name_in_target
990
- if feature_group .feature_name_in_target is not None
991
- else feature_group .record_identifier_feature_name
992
- )
993
-
994
908
join_condition_string = (
995
- f"\n { feature_group .join_type .value } fg_{ suffix } \n "
996
- + f'ON fg_base."{ feature_group .target_feature_name_in_base } "'
997
- + f" { feature_group .join_comparator .value } "
998
- + f'fg_{ suffix } ."{ feature_name_in_target } "'
909
+ f"\n JOIN fg_{ suffix } \n "
910
+ + f'ON fg_base."{ feature_group .target_feature_name_in_base } " = '
911
+ + f'fg_{ suffix } ."{ feature_group .record_identifier_feature_name } "'
999
912
)
1000
913
base_timestamp_cast_function_name = "from_unixtime"
1001
914
if self ._event_time_identifier_feature_type == FeatureTypeEnum .STRING :
0 commit comments