Skip to content

Commit 2e6f4f7

Browse files
authored
Merge branch 'master' into custom
2 parents 709fe0c + bc8a0d2 commit 2e6f4f7

File tree

8 files changed

+90
-29
lines changed

8 files changed

+90
-29
lines changed

CHANGELOG.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,23 @@
11
# Changelog
22

3+
## v2.29.1 (2021-03-09)
4+
5+
### Bug Fixes and Other Changes
6+
7+
* return all failed row indices in feature_group.ingest
8+
* move service-role path parsing for AmazonSageMaker-ExecutionRole for get_execution_role() into except block of IAM get_role() call and add warning message
9+
* add description parameter for RegisterModelStep
10+
* add type annotations for Lineage
11+
12+
### Documentation Changes
13+
14+
* remove ellipsis from CHANGELOG.md
15+
316
## v2.29.0 (2021-03-04)
417

518
### Features
619

7-
* add support for TensorFlow 2.4.1 for training, inference and
20+
* add support for TensorFlow 2.4.1 for training, inference and data parallel
821
* Support profiler config in the pipeline training job step
922
* support PyTorch 1.7.1 training, inference and data parallel
1023

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
2.29.1.dev0
1+
2.29.2.dev0

setup.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,11 @@ def read_version():
4242
"smdebug_rulesconfig==1.0.1",
4343
"importlib-metadata>=1.4.0",
4444
"packaging>=20.0",
45+
"pandas",
4546
]
4647

4748
# Specific use case dependencies
4849
extras = {
49-
"analytics": ["pandas"],
50-
"feature-store": ["pandas"],
5150
"local": [
5251
"urllib3>=1.21.1,!=1.25,!=1.25.1",
5352
"docker-compose>=1.25.2",

src/sagemaker/feature_store/feature_group.py

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ class IngestionManagerPandas:
160160
data_frame: DataFrame = attr.ib()
161161
max_workers: int = attr.ib(default=1)
162162
_futures: Dict[Any, Any] = attr.ib(init=False, factory=dict)
163+
_failed_indices: List[int] = attr.ib(factory=list)
163164

164165
@staticmethod
165166
def _ingest_single_batch(
@@ -168,7 +169,7 @@ def _ingest_single_batch(
168169
sagemaker_session: Session,
169170
start_index: int,
170171
end_index: int,
171-
):
172+
) -> List[int]:
172173
"""Ingest a single batch of DataFrame rows into FeatureStore.
173174
174175
Args:
@@ -177,19 +178,38 @@ def _ingest_single_batch(
177178
sagemaker_session (Session): session instance to perform boto calls.
178179
start_index (int): starting position to ingest in this batch.
179180
end_index (int): ending position to ingest in this batch.
181+
182+
Returns:
183+
List of row indices that failed to be ingested.
180184
"""
181185
logger.info("Started ingesting index %d to %d", start_index, end_index)
182-
for row in data_frame[start_index:end_index].itertuples(index=False):
186+
failed_rows = list()
187+
for row in data_frame[start_index:end_index].itertuples():
183188
record = [
184189
FeatureValue(
185-
feature_name=data_frame.columns[index], value_as_string=str(row[index])
190+
feature_name=data_frame.columns[index - 1], value_as_string=str(row[index])
186191
)
187-
for index in range(len(row))
192+
for index in range(1, len(row))
188193
if pd.notna(row[index])
189194
]
190-
sagemaker_session.put_record(
191-
feature_group_name=feature_group_name, record=[value.to_dict() for value in record]
192-
)
195+
try:
196+
sagemaker_session.put_record(
197+
feature_group_name=feature_group_name,
198+
record=[value.to_dict() for value in record],
199+
)
200+
except Exception as e: # pylint: disable=broad-except
201+
logger.error("Failed to ingest row %d: %s", row[0], e)
202+
failed_rows.append(row[0])
203+
return failed_rows
204+
205+
@property
206+
def failed_rows(self) -> List[int]:
207+
"""Get rows that failed to ingest
208+
209+
Returns:
210+
List of row indices that failed to be ingested.
211+
"""
212+
return self._failed_indices
193213

194214
def wait(self, timeout=None):
195215
"""Wait for the ingestion process to finish.
@@ -198,18 +218,17 @@ def wait(self, timeout=None):
198218
timeout (Union[int, float]): ``concurrent.futures.TimeoutError`` will be raised
199219
if timeout is reached.
200220
"""
201-
failed = False
221+
self._failed_indices = list()
202222
for future in as_completed(self._futures, timeout=timeout):
203223
start, end = self._futures[future]
204-
try:
205-
future.result()
206-
except Exception as e: # pylint: disable=broad-except
207-
failed = True
208-
logger.error("Failed to ingest row %d to %d: %s", start, end, e)
224+
result = future.result()
225+
if result:
226+
logger.error("Failed to ingest row %d to %d", start, end)
209227
else:
210228
logger.info("Successfully ingested row %d to %d", start, end)
229+
self._failed_indices += result
211230

212-
if failed:
231+
if len(self._failed_indices) > 0:
213232
raise RuntimeError(
214233
f"Failed to ingest some data into FeatureGroup {self.feature_group_name}"
215234
)

src/sagemaker/session.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3498,14 +3498,6 @@ def get_caller_identity_arn(self):
34983498
endpoint_url=sts_regional_endpoint(self.boto_region_name),
34993499
).get_caller_identity()["Arn"]
35003500

3501-
if "AmazonSageMaker-ExecutionRole" in assumed_role:
3502-
role = re.sub(
3503-
r"^(.+)sts::(\d+):assumed-role/(.+?)/.*$",
3504-
r"\1iam::\2:role/service-role/\3",
3505-
assumed_role,
3506-
)
3507-
return role
3508-
35093501
role = re.sub(r"^(.+)sts::(\d+):assumed-role/(.+?)/.*$", r"\1iam::\2:role/\3", assumed_role)
35103502

35113503
# Call IAM to get the role's path
@@ -3518,6 +3510,24 @@ def get_caller_identity_arn(self):
35183510
role_name,
35193511
)
35203512

3513+
# This conditional has been present since the inception of SageMaker
3514+
# Guessing this conditional's purpose was to handle lack of IAM permissions
3515+
# https://github.com/aws/sagemaker-python-sdk/issues/2089#issuecomment-791802713
3516+
if "AmazonSageMaker-ExecutionRole" in assumed_role:
3517+
LOGGER.warning(
3518+
"Assuming role was created in SageMaker AWS console, "
3519+
"as the name contains `AmazonSageMaker-ExecutionRole`. "
3520+
"Defaulting to Role ARN with service-role in path. "
3521+
"If this Role ARN is incorrect, please add "
3522+
"IAM read permissions to your role or supply the "
3523+
"Role Arn directly."
3524+
)
3525+
role = re.sub(
3526+
r"^(.+)sts::(\d+):assumed-role/(.+?)/.*$",
3527+
r"\1iam::\2:role/service-role/\3",
3528+
assumed_role,
3529+
)
3530+
35213531
return role
35223532

35233533
def logs_for_job( # noqa: C901 - suppress complexity warning for this method

tests/integ/test_feature_store.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ def test_create_feature_store(
197197
data_frame=pandas_data_frame, max_workers=3, wait=False
198198
)
199199
ingestion_manager.wait()
200+
assert 0 == len(ingestion_manager.failed_rows)
200201

201202
# Query the integrated Glue table.
202203
athena_query = feature_group.athena_query()

tests/unit/sagemaker/feature_store/test_feature_store.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,19 +274,20 @@ def test_ingestion_manager_run_success():
274274

275275
@patch(
276276
"sagemaker.feature_store.feature_group.IngestionManagerPandas._ingest_single_batch",
277-
MagicMock(side_effect=Exception("Failed!")),
277+
MagicMock(return_value=[1]),
278278
)
279279
def test_ingestion_manager_run_failure():
280280
df = pd.DataFrame({"float": pd.Series([2.0], dtype="float64")})
281281
manager = IngestionManagerPandas(
282282
feature_group_name="MyGroup",
283283
sagemaker_session=sagemaker_session_mock,
284284
data_frame=df,
285-
max_workers=10,
285+
max_workers=1,
286286
)
287287
with pytest.raises(RuntimeError) as error:
288288
manager.run()
289289
assert "Failed to ingest some data into FeatureGroup MyGroup" in str(error)
290+
assert manager.failed_rows == [1]
290291

291292

292293
@pytest.fixture

tests/unit/test_session.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -398,12 +398,30 @@ def test_get_caller_identity_arn_from_a_role(sts_regional_endpoint, boto_session
398398
@patch("os.path.exists", side_effect=mock_exists(NOTEBOOK_METADATA_FILE, False))
399399
@patch("sagemaker.session.sts_regional_endpoint", return_value=STS_ENDPOINT)
400400
def test_get_caller_identity_arn_from_an_execution_role(sts_regional_endpoint, boto_session):
401+
sess = Session(boto_session)
402+
sts_arn = "arn:aws:sts::369233609183:assumed-role/AmazonSageMaker-ExecutionRole-20171129T072388/SageMaker"
403+
sess.boto_session.client("sts", endpoint_url=STS_ENDPOINT).get_caller_identity.return_value = {
404+
"Arn": sts_arn
405+
}
406+
iam_arn = "arn:aws:iam::369233609183:role/AmazonSageMaker-ExecutionRole-20171129T072388"
407+
sess.boto_session.client("iam").get_role.return_value = {"Role": {"Arn": iam_arn}}
408+
409+
actual = sess.get_caller_identity_arn()
410+
assert actual == iam_arn
411+
412+
413+
@patch("os.path.exists", side_effect=mock_exists(NOTEBOOK_METADATA_FILE, False))
414+
@patch("sagemaker.session.sts_regional_endpoint", return_value=STS_ENDPOINT)
415+
def test_get_caller_identity_arn_from_a_sagemaker_execution_role_with_iam_client_error(
416+
sts_regional_endpoint, boto_session
417+
):
401418
sess = Session(boto_session)
402419
arn = "arn:aws:sts::369233609183:assumed-role/AmazonSageMaker-ExecutionRole-20171129T072388/SageMaker"
403420
sess.boto_session.client("sts", endpoint_url=STS_ENDPOINT).get_caller_identity.return_value = {
404421
"Arn": arn
405422
}
406-
sess.boto_session.client("iam").get_role.return_value = {"Role": {"Arn": arn}}
423+
424+
sess.boto_session.client("iam").get_role.side_effect = ClientError({}, {})
407425

408426
actual = sess.get_caller_identity_arn()
409427
assert (

0 commit comments

Comments
 (0)