Skip to content

Commit 0147ff8

Browse files
keshav-chandakKeshav Chandak
authored andcommitted
Merge pull request #1 from keshav-chandak/feature/latest_exeution_logs
Support to get latest monitoring execution processing logs
2 parents 80102e5 + 090225f commit 0147ff8

File tree

5 files changed

+129
-4
lines changed

5 files changed

+129
-4
lines changed

src/sagemaker/model_monitor/clarify_model_monitoring.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from sagemaker.session import Session
2828
from sagemaker.utils import name_from_base
2929
from sagemaker.clarify import SageMakerClarifyProcessor, ModelPredictedLabelConfig
30+
from sagemaker.lineage._utils import get_resource_name_from_arn
3031

3132
_LOGGER = logging.getLogger(__name__)
3233

@@ -154,6 +155,26 @@ def list_executions(self):
154155
for execution in executions
155156
]
156157

158+
def get_latest_execution_logs(self, wait=False):
159+
"""Get the processing job logs for the most recent monitoring execution
160+
161+
Args:
162+
wait (bool): Whether the call should wait until the job completes (default: False).
163+
164+
Returns: None
165+
"""
166+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
167+
monitoring_schedule_name=self.monitoring_schedule_name
168+
)
169+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
170+
raise ValueError("No execution jobs were kicked off.")
171+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
172+
raise ValueError("Processing Job did not run for the last execution")
173+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
174+
self.sagemaker_session.logs_for_processing_job(
175+
job_name=get_resource_name_from_arn(job_arn), wait=wait
176+
)
177+
157178
def _create_baselining_processor(self):
158179
"""Create and return a SageMakerClarifyProcessor object which will run the baselining job.
159180

src/sagemaker/model_monitor/model_monitoring.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
resolve_value_from_config,
6464
resolve_class_attribute_from_config,
6565
)
66+
from sagemaker.lineage._utils import get_resource_name_from_arn
6667

6768
DEFAULT_REPOSITORY_NAME = "sagemaker-model-monitor-analyzer"
6869

@@ -768,6 +769,26 @@ def list_executions(self):
768769

769770
return monitoring_executions
770771

772+
def get_latest_execution_logs(self, wait=False):
773+
"""Get the processing job logs for the most recent monitoring execution
774+
775+
Args:
776+
wait (bool): Whether the call should wait until the job completes (default: False).
777+
778+
Returns: None
779+
"""
780+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
781+
monitoring_schedule_name=self.monitoring_schedule_name
782+
)
783+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
784+
raise ValueError("No execution jobs were kicked off.")
785+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
786+
raise ValueError("Processing Job did not run for the last execution")
787+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
788+
self.sagemaker_session.logs_for_processing_job(
789+
job_name=get_resource_name_from_arn(job_arn), wait=wait
790+
)
791+
771792
def update_monitoring_alert(
772793
self,
773794
monitoring_alert_name: str,

tests/integ/test_clarify_model_monitor.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,9 @@ def test_bias_monitor(sagemaker_session, scheduled_bias_monitor, endpoint_name,
293293
)
294294
@pytest.mark.flaky(reruns=5, reruns_delay=2)
295295
def test_run_bias_monitor(
296-
scheduled_bias_monitor, sagemaker_session, endpoint_name, ground_truth_input, upload_actual_data
296+
scheduled_bias_monitor, sagemaker_session, endpoint_name, ground_truth_input, capfd
297297
):
298-
_verify_execution_status(scheduled_bias_monitor)
298+
_verify_execution_status(scheduled_bias_monitor, capfd=capfd)
299299

300300
_verify_bias_job_description(
301301
sagemaker_session=sagemaker_session,
@@ -408,8 +408,9 @@ def test_run_explainability_monitor(
408408
endpoint_name,
409409
ground_truth_input,
410410
upload_actual_data,
411+
capfd,
411412
):
412-
_verify_execution_status(scheduled_explainability_monitor)
413+
_verify_execution_status(scheduled_explainability_monitor, capfd=capfd)
413414

414415
_verify_explainability_job_description(
415416
sagemaker_session=sagemaker_session,
@@ -514,11 +515,12 @@ def _verify_job_description(
514515
)
515516

516517

517-
def _verify_execution_status(monitor):
518+
def _verify_execution_status(monitor, capfd):
518519
_wait_for_completion(monitor)
519520
executions = monitor.list_executions()
520521
assert len(executions) > 0
521522
schedule_desc = monitor.describe_schedule()
523+
_check_processing_logs_generated(schedule_description=schedule_desc, capfd=capfd)
522524
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
523525
last_execution_status = execution_summary["MonitoringExecutionStatus"]
524526
assert last_execution_status in ["Completed", "CompletedWithViolations"]
@@ -600,3 +602,10 @@ def _wait_for_completion(monitor):
600602
# End this loop once the execution has reached a terminal state.
601603
if last_execution_status in ["Completed", "CompletedWithViolations", "Failed", "Stopped"]:
602604
break
605+
606+
607+
def _check_processing_logs_generated(self, schedule_description, capfd):
608+
self.get_latest_execution_logs(wait=False)
609+
out, _ = capfd.readouterr()
610+
assert len(out) > 0
611+
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out

tests/integ/test_model_monitor.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,43 @@ def test_default_monitor_suggest_baseline_and_create_monitoring_schedule_with_cu
486486
assert len(summary["MonitoringScheduleSummaries"]) > 0
487487

488488

489+
def test_default_monitor_display_logs_errors(sagemaker_session, predictor, capfd):
490+
my_default_monitor = DefaultModelMonitor(role=ROLE, sagemaker_session=sagemaker_session)
491+
492+
my_default_monitor.create_monitoring_schedule(
493+
endpoint_input=predictor.endpoint_name,
494+
schedule_cron_expression=CronExpressionGenerator.hourly(),
495+
)
496+
497+
try:
498+
my_default_monitor.get_latest_execution_logs(wait=False)
499+
except ValueError as ve:
500+
assert "No execution jobs were kicked off." in str(ve)
501+
502+
for _ in retries(
503+
max_retry_count=100,
504+
exception_message_prefix="Waiting for the an execution to start",
505+
seconds_to_sleep=50,
506+
):
507+
schedule_desc = my_default_monitor.describe_schedule()
508+
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
509+
last_execution_status = None
510+
511+
# Once there is an execution, get its status
512+
if execution_summary is not None:
513+
last_execution_status = execution_summary["MonitoringExecutionStatus"]
514+
# End this loop once the execution has reached a terminal state.
515+
if last_execution_status is not None:
516+
break
517+
try:
518+
my_default_monitor.get_latest_execution_logs(wait=False)
519+
except ValueError as ve:
520+
assert "Processing Job did not run for the last execution" in str(ve)
521+
522+
my_default_monitor.stop_monitoring_schedule()
523+
my_default_monitor.delete_monitoring_schedule()
524+
525+
489526
@pytest.mark.skipif(
490527
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
491528
reason="ModelMonitoring is not yet supported in this region.",
@@ -1643,6 +1680,7 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
16431680
output_kms_key,
16441681
updated_volume_kms_key,
16451682
updated_output_kms_key,
1683+
capfd,
16461684
):
16471685
baseline_dataset = os.path.join(DATA_DIR, "monitor/baseline_dataset.csv")
16481686

@@ -1771,6 +1809,10 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
17711809

17721810
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
17731811

1812+
_check_processing_logs_generated(
1813+
monitor=my_attached_monitor, schedule_description=schedule_description, capfd=capfd
1814+
)
1815+
17741816
my_attached_monitor.stop_monitoring_schedule()
17751817

17761818
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
@@ -1877,6 +1919,13 @@ def test_default_monitor_monitoring_alerts(sagemaker_session, predictor):
18771919
my_default_monitor.delete_monitoring_schedule()
18781920

18791921

1922+
def _check_processing_logs_generated(monitor, schedule_description, capfd):
1923+
monitor.get_latest_execution_logs(wait=False)
1924+
out, _ = capfd.readouterr()
1925+
assert len(out) > 0
1926+
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out
1927+
1928+
18801929
def _wait_for_schedule_changes_to_apply(monitor):
18811930
"""Waits for the monitor to no longer be in the 'Pending' state. Updates take under a minute
18821931
to apply.

tests/unit/sagemaker/monitor/test_clarify_model_monitor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@
221221
"NetworkConfig": NETWORK_CONFIG._to_request_dict(),
222222
}
223223

224+
MONITORING_EXECUTIONS_EMPTY = {
225+
"MonitoringExecutionSummaries": [],
226+
}
227+
228+
MONITORING_EXECUTIONS_NO_PROCESSING_JOB = {
229+
"MonitoringExecutionSummaries": [{"MonitoringSchedule": "MonitoringSchedule"}],
230+
}
231+
224232
# For update API
225233
NEW_ROLE_ARN = "arn:aws:iam::012345678902:role/{}".format(ROLE)
226234
NEW_INSTANCE_COUNT = 2
@@ -1716,3 +1724,20 @@ def _test_model_explainability_monitor_delete_schedule(
17161724
sagemaker_session.sagemaker_client.delete_model_explainability_job_definition.assert_called_once_with(
17171725
JobDefinitionName=job_definition_name
17181726
)
1727+
1728+
1729+
def test_model_explainability_monitor_logs_failure(model_explainability_monitor, sagemaker_session):
1730+
sagemaker_session.list_monitoring_executions = MagicMock(
1731+
return_value=MONITORING_EXECUTIONS_EMPTY
1732+
)
1733+
try:
1734+
model_explainability_monitor.get_latest_execution_logs()
1735+
except ValueError as ve:
1736+
assert "No execution jobs were kicked off." in str(ve)
1737+
sagemaker_session.list_monitoring_executions = MagicMock(
1738+
return_value=MONITORING_EXECUTIONS_NO_PROCESSING_JOB
1739+
)
1740+
try:
1741+
model_explainability_monitor.get_latest_execution_logs()
1742+
except ValueError as ve:
1743+
assert "Processing Job did not run for the last execution" in str(ve)

0 commit comments

Comments
 (0)