Skip to content

Commit 090225f

Browse files
author
Keshav Chandak
committed
Support to get latest monitoring execution processing logs
1 parent c6d412f commit 090225f

File tree

5 files changed

+130
-4
lines changed

5 files changed

+130
-4
lines changed

src/sagemaker/model_monitor/clarify_model_monitoring.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from sagemaker.session import Session
2828
from sagemaker.utils import name_from_base
2929
from sagemaker.clarify import SageMakerClarifyProcessor, ModelPredictedLabelConfig
30+
from sagemaker.lineage._utils import get_resource_name_from_arn
3031

3132
_LOGGER = logging.getLogger(__name__)
3233

@@ -154,6 +155,26 @@ def list_executions(self):
154155
for execution in executions
155156
]
156157

158+
def get_latest_execution_logs(self, wait=False):
159+
"""Get the processing job logs for the most recent monitoring execution
160+
161+
Args:
162+
wait (bool): Whether the call should wait until the job completes (default: False).
163+
164+
Returns: None
165+
"""
166+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
167+
monitoring_schedule_name=self.monitoring_schedule_name
168+
)
169+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
170+
raise ValueError("No execution jobs were kicked off.")
171+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
172+
raise ValueError("Processing Job did not run for the last execution")
173+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
174+
self.sagemaker_session.logs_for_processing_job(
175+
job_name=get_resource_name_from_arn(job_arn), wait=wait
176+
)
177+
157178
def _create_baselining_processor(self):
158179
"""Create and return a SageMakerClarifyProcessor object which will run the baselining job.
159180

src/sagemaker/model_monitor/model_monitoring.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
resolve_value_from_config,
6464
resolve_class_attribute_from_config,
6565
)
66+
from sagemaker.lineage._utils import get_resource_name_from_arn
6667

6768
DEFAULT_REPOSITORY_NAME = "sagemaker-model-monitor-analyzer"
6869

@@ -768,6 +769,26 @@ def list_executions(self):
768769

769770
return monitoring_executions
770771

772+
def get_latest_execution_logs(self, wait=False):
773+
"""Get the processing job logs for the most recent monitoring execution
774+
775+
Args:
776+
wait (bool): Whether the call should wait until the job completes (default: False).
777+
778+
Returns: None
779+
"""
780+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
781+
monitoring_schedule_name=self.monitoring_schedule_name
782+
)
783+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
784+
raise ValueError("No execution jobs were kicked off.")
785+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
786+
raise ValueError("Processing Job did not run for the last execution")
787+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
788+
self.sagemaker_session.logs_for_processing_job(
789+
job_name=get_resource_name_from_arn(job_arn), wait=wait
790+
)
791+
771792
def update_monitoring_alert(
772793
self,
773794
monitoring_alert_name: str,

tests/integ/test_clarify_model_monitor.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,9 @@ def test_bias_monitor(sagemaker_session, scheduled_bias_monitor, endpoint_name,
293293
)
294294
@pytest.mark.flaky(reruns=5, reruns_delay=2)
295295
def test_run_bias_monitor(
296-
scheduled_bias_monitor, sagemaker_session, endpoint_name, ground_truth_input, upload_actual_data
296+
scheduled_bias_monitor, sagemaker_session, endpoint_name, ground_truth_input, capfd
297297
):
298-
_verify_execution_status(scheduled_bias_monitor)
298+
_verify_execution_status(scheduled_bias_monitor, capfd=capfd)
299299

300300
_verify_bias_job_description(
301301
sagemaker_session=sagemaker_session,
@@ -408,8 +408,9 @@ def test_run_explainability_monitor(
408408
endpoint_name,
409409
ground_truth_input,
410410
upload_actual_data,
411+
capfd,
411412
):
412-
_verify_execution_status(scheduled_explainability_monitor)
413+
_verify_execution_status(scheduled_explainability_monitor, capfd=capfd)
413414

414415
_verify_explainability_job_description(
415416
sagemaker_session=sagemaker_session,
@@ -514,11 +515,12 @@ def _verify_job_description(
514515
)
515516

516517

517-
def _verify_execution_status(monitor):
518+
def _verify_execution_status(monitor, capfd):
518519
_wait_for_completion(monitor)
519520
executions = monitor.list_executions()
520521
assert len(executions) > 0
521522
schedule_desc = monitor.describe_schedule()
523+
_check_processing_logs_generated(schedule_description=schedule_desc, capfd=capfd)
522524
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
523525
last_execution_status = execution_summary["MonitoringExecutionStatus"]
524526
assert last_execution_status in ["Completed", "CompletedWithViolations"]
@@ -600,3 +602,10 @@ def _wait_for_completion(monitor):
600602
# End this loop once the execution has reached a terminal state.
601603
if last_execution_status in ["Completed", "CompletedWithViolations", "Failed", "Stopped"]:
602604
break
605+
606+
607+
def _check_processing_logs_generated(self, schedule_description, capfd):
608+
self.get_latest_execution_logs(wait=False)
609+
out, _ = capfd.readouterr()
610+
assert len(out) > 0
611+
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out

tests/integ/test_model_monitor.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,44 @@ def test_default_monitor_suggest_baseline_and_create_monitoring_schedule_with_cu
486486
assert len(summary["MonitoringScheduleSummaries"]) > 0
487487

488488

489+
def test_default_monitor_display_logs_errors(sagemaker_session, predictor, capfd):
490+
my_default_monitor = DefaultModelMonitor(role=ROLE, sagemaker_session=sagemaker_session)
491+
492+
my_default_monitor.create_monitoring_schedule(
493+
endpoint_input=predictor.endpoint_name,
494+
schedule_cron_expression=CronExpressionGenerator.hourly(),
495+
)
496+
497+
schedule_description = my_default_monitor.describe_schedule()
498+
try:
499+
my_default_monitor.get_latest_execution_logs(wait=False)
500+
except ValueError as ve:
501+
assert "No execution jobs were kicked off." in str(ve)
502+
503+
for _ in retries(
504+
max_retry_count=100,
505+
exception_message_prefix="Waiting for the an execution to start",
506+
seconds_to_sleep=50,
507+
):
508+
schedule_desc = my_default_monitor.describe_schedule()
509+
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
510+
last_execution_status = None
511+
512+
# Once there is an execution, get its status
513+
if execution_summary is not None:
514+
last_execution_status = execution_summary["MonitoringExecutionStatus"]
515+
# End this loop once the execution has reached a terminal state.
516+
if last_execution_status is not None:
517+
break
518+
try:
519+
my_default_monitor.get_latest_execution_logs(wait=False)
520+
except ValueError as ve:
521+
assert "Processing Job did not run for the last execution" in str(ve)
522+
523+
my_default_monitor.stop_monitoring_schedule()
524+
my_default_monitor.delete_monitoring_schedule()
525+
526+
489527
@pytest.mark.skipif(
490528
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
491529
reason="ModelMonitoring is not yet supported in this region.",
@@ -1643,6 +1681,7 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
16431681
output_kms_key,
16441682
updated_volume_kms_key,
16451683
updated_output_kms_key,
1684+
capfd,
16461685
):
16471686
baseline_dataset = os.path.join(DATA_DIR, "monitor/baseline_dataset.csv")
16481687

@@ -1771,6 +1810,10 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
17711810

17721811
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
17731812

1813+
_check_processing_logs_generated(
1814+
monitor=my_attached_monitor, schedule_description=schedule_description, capfd=capfd
1815+
)
1816+
17741817
my_attached_monitor.stop_monitoring_schedule()
17751818

17761819
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
@@ -1877,6 +1920,13 @@ def test_default_monitor_monitoring_alerts(sagemaker_session, predictor):
18771920
my_default_monitor.delete_monitoring_schedule()
18781921

18791922

1923+
def _check_processing_logs_generated(monitor, schedule_description, capfd):
1924+
monitor.get_latest_execution_logs(wait=False)
1925+
out, _ = capfd.readouterr()
1926+
assert len(out) > 0
1927+
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out
1928+
1929+
18801930
def _wait_for_schedule_changes_to_apply(monitor):
18811931
"""Waits for the monitor to no longer be in the 'Pending' state. Updates take under a minute
18821932
to apply.

tests/unit/sagemaker/monitor/test_clarify_model_monitor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@
221221
"NetworkConfig": NETWORK_CONFIG._to_request_dict(),
222222
}
223223

224+
MONITORING_EXECUTIONS_EMPTY = {
225+
"MonitoringExecutionSummaries": [],
226+
}
227+
228+
MONITORING_EXECUTIONS_NO_PROCESSING_JOB = {
229+
"MonitoringExecutionSummaries": [{"MonitoringSchedule": "MonitoringSchedule"}],
230+
}
231+
224232
# For update API
225233
NEW_ROLE_ARN = "arn:aws:iam::012345678902:role/{}".format(ROLE)
226234
NEW_INSTANCE_COUNT = 2
@@ -1716,3 +1724,20 @@ def _test_model_explainability_monitor_delete_schedule(
17161724
sagemaker_session.sagemaker_client.delete_model_explainability_job_definition.assert_called_once_with(
17171725
JobDefinitionName=job_definition_name
17181726
)
1727+
1728+
1729+
def test_model_explainability_monitor_logs_failure(model_explainability_monitor, sagemaker_session):
1730+
sagemaker_session.list_monitoring_executions = MagicMock(
1731+
return_value=MONITORING_EXECUTIONS_EMPTY
1732+
)
1733+
try:
1734+
model_explainability_monitor.get_latest_execution_logs()
1735+
except ValueError as ve:
1736+
assert "No execution jobs were kicked off." in str(ve)
1737+
sagemaker_session.list_monitoring_executions = MagicMock(
1738+
return_value=MONITORING_EXECUTIONS_NO_PROCESSING_JOB
1739+
)
1740+
try:
1741+
model_explainability_monitor.get_latest_execution_logs()
1742+
except ValueError as ve:
1743+
assert "Processing Job did not run for the last execution" in str(ve)

0 commit comments

Comments
 (0)