Skip to content

Commit ed15ab5

Browse files
author
Keshav Chandak
committed
Support to get latest monitoring execution processing logs
1 parent e0102e7 commit ed15ab5

File tree

5 files changed

+139
-5
lines changed

5 files changed

+139
-5
lines changed

src/sagemaker/model_monitor/clarify_model_monitoring.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
from sagemaker.session import Session
2828
from sagemaker.utils import name_from_base
2929
from sagemaker.clarify import SageMakerClarifyProcessor, ModelPredictedLabelConfig
30+
from sagemaker.lineage._utils import get_resource_name_from_arn
3031

3132
_LOGGER = logging.getLogger(__name__)
3233

@@ -154,6 +155,29 @@ def list_executions(self):
154155
for execution in executions
155156
]
156157

158+
def get_latest_execution_logs(self, wait=False):
159+
"""Get the processing job logs for the most recent monitoring execution
160+
161+
Args:
162+
wait (bool): Whether the call should wait until the job completes (default: False).
163+
164+
Raises:
165+
ValueError: If no execution job or processing job for the last execution has run
166+
167+
Returns: None
168+
"""
169+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
170+
monitoring_schedule_name=self.monitoring_schedule_name
171+
)
172+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
173+
raise ValueError("No execution jobs were kicked off.")
174+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
175+
raise ValueError("Processing Job did not run for the last execution")
176+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
177+
self.sagemaker_session.logs_for_processing_job(
178+
job_name=get_resource_name_from_arn(job_arn), wait=wait
179+
)
180+
157181
def _create_baselining_processor(self):
158182
"""Create and return a SageMakerClarifyProcessor object which will run the baselining job.
159183

src/sagemaker/model_monitor/model_monitoring.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
resolve_value_from_config,
6464
resolve_class_attribute_from_config,
6565
)
66+
from sagemaker.lineage._utils import get_resource_name_from_arn
6667

6768
DEFAULT_REPOSITORY_NAME = "sagemaker-model-monitor-analyzer"
6869

@@ -768,6 +769,29 @@ def list_executions(self):
768769

769770
return monitoring_executions
770771

772+
def get_latest_execution_logs(self, wait=False):
773+
"""Get the processing job logs for the most recent monitoring execution
774+
775+
Args:
776+
wait (bool): Whether the call should wait until the job completes (default: False).
777+
778+
Raises:
779+
ValueError: If no execution job or processing job for the last execution has run
780+
781+
Returns: None
782+
"""
783+
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
784+
monitoring_schedule_name=self.monitoring_schedule_name
785+
)
786+
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
787+
raise ValueError("No execution jobs were kicked off.")
788+
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
789+
raise ValueError("Processing Job did not run for the last execution")
790+
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
791+
self.sagemaker_session.logs_for_processing_job(
792+
job_name=get_resource_name_from_arn(job_arn), wait=wait
793+
)
794+
771795
def update_monitoring_alert(
772796
self,
773797
monitoring_alert_name: str,

tests/integ/test_clarify_model_monitor.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,11 @@ def test_bias_monitor(sagemaker_session, scheduled_bias_monitor, endpoint_name,
293293
)
294294
@pytest.mark.flaky(reruns=5, reruns_delay=2)
295295
def test_run_bias_monitor(
296-
scheduled_bias_monitor, sagemaker_session, endpoint_name, ground_truth_input, upload_actual_data
296+
scheduled_bias_monitor, sagemaker_session, endpoint_name, ground_truth_input, capfd
297297
):
298-
_verify_execution_status(scheduled_bias_monitor)
298+
_verify_execution_status(
299+
scheduled_bias_monitor, capfd=capfd, sagemaker_session=sagemaker_session
300+
)
299301

300302
_verify_bias_job_description(
301303
sagemaker_session=sagemaker_session,
@@ -408,8 +410,11 @@ def test_run_explainability_monitor(
408410
endpoint_name,
409411
ground_truth_input,
410412
upload_actual_data,
413+
capfd,
411414
):
412-
_verify_execution_status(scheduled_explainability_monitor)
415+
_verify_execution_status(
416+
scheduled_explainability_monitor, capfd=capfd, sagemaker_session=sagemaker_session
417+
)
413418

414419
_verify_explainability_job_description(
415420
sagemaker_session=sagemaker_session,
@@ -514,11 +519,14 @@ def _verify_job_description(
514519
)
515520

516521

517-
def _verify_execution_status(monitor):
522+
def _verify_execution_status(monitor, sagemaker_session, capfd):
518523
_wait_for_completion(monitor)
519-
executions = monitor.list_executions()
524+
executions = sagemaker_session.sagemaker_client.list_monitoring_executions(
525+
MonitoringScheduleName=monitor.monitoring_schedule_name
526+
)
520527
assert len(executions) > 0
521528
schedule_desc = monitor.describe_schedule()
529+
_check_processing_logs_generated(monitor=monitor, capfd=capfd)
522530
execution_summary = schedule_desc.get("LastMonitoringExecutionSummary")
523531
last_execution_status = execution_summary["MonitoringExecutionStatus"]
524532
assert last_execution_status in ["Completed", "CompletedWithViolations"]
@@ -600,3 +608,12 @@ def _wait_for_completion(monitor):
600608
# End this loop once the execution has reached a terminal state.
601609
if last_execution_status in ["Completed", "CompletedWithViolations", "Failed", "Stopped"]:
602610
break
611+
612+
613+
def _check_processing_logs_generated(monitor, capfd):
614+
try:
615+
monitor.get_latest_execution_logs(wait=False)
616+
except ValueError:
617+
pass
618+
out, _ = capfd.readouterr()
619+
assert len(out) > 0

tests/integ/test_model_monitor.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,38 @@ def test_default_monitor_suggest_baseline_and_create_monitoring_schedule_with_cu
486486
assert len(summary["MonitoringScheduleSummaries"]) > 0
487487

488488

489+
def test_default_monitor_display_logs_errors(sagemaker_session):
490+
my_default_monitor = DefaultModelMonitor(role=ROLE, sagemaker_session=sagemaker_session)
491+
492+
data_captured_destination_s3_uri = os.path.join(
493+
"s3://",
494+
sagemaker_session.default_bucket(),
495+
"sagemaker-serving-batch-transform",
496+
str(uuid.uuid4()),
497+
)
498+
499+
batch_transform_input = BatchTransformInput(
500+
data_captured_destination_s3_uri=data_captured_destination_s3_uri,
501+
destination="/opt/ml/processing/output",
502+
dataset_format=MonitoringDatasetFormat.csv(header=False),
503+
)
504+
505+
my_default_monitor.create_monitoring_schedule(
506+
batch_transform_input=batch_transform_input,
507+
schedule_cron_expression=CronExpressionGenerator.hourly(),
508+
)
509+
510+
_wait_for_schedule_changes_to_apply(monitor=my_default_monitor)
511+
512+
try:
513+
my_default_monitor.get_latest_execution_logs(wait=False)
514+
except ValueError as ve:
515+
assert "No execution jobs were kicked off." in str(ve)
516+
517+
my_default_monitor.stop_monitoring_schedule()
518+
my_default_monitor.delete_monitoring_schedule()
519+
520+
489521
@pytest.mark.skipif(
490522
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
491523
reason="ModelMonitoring is not yet supported in this region.",
@@ -1643,6 +1675,7 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
16431675
output_kms_key,
16441676
updated_volume_kms_key,
16451677
updated_output_kms_key,
1678+
capfd,
16461679
):
16471680
baseline_dataset = os.path.join(DATA_DIR, "monitor/baseline_dataset.csv")
16481681

@@ -1771,6 +1804,10 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
17711804

17721805
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
17731806

1807+
_check_processing_logs_generated(
1808+
monitor=my_attached_monitor, schedule_description=schedule_description, capfd=capfd
1809+
)
1810+
17741811
my_attached_monitor.stop_monitoring_schedule()
17751812

17761813
_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
@@ -1877,6 +1914,13 @@ def test_default_monitor_monitoring_alerts(sagemaker_session, predictor):
18771914
my_default_monitor.delete_monitoring_schedule()
18781915

18791916

1917+
def _check_processing_logs_generated(monitor, schedule_description, capfd):
1918+
monitor.get_latest_execution_logs(wait=False)
1919+
out, _ = capfd.readouterr()
1920+
assert len(out) > 0
1921+
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out
1922+
1923+
18801924
def _wait_for_schedule_changes_to_apply(monitor):
18811925
"""Waits for the monitor to no longer be in the 'Pending' state. Updates take under a minute
18821926
to apply.

tests/unit/sagemaker/monitor/test_clarify_model_monitor.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,14 @@
221221
"NetworkConfig": NETWORK_CONFIG._to_request_dict(),
222222
}
223223

224+
MONITORING_EXECUTIONS_EMPTY = {
225+
"MonitoringExecutionSummaries": [],
226+
}
227+
228+
MONITORING_EXECUTIONS_NO_PROCESSING_JOB = {
229+
"MonitoringExecutionSummaries": [{"MonitoringSchedule": "MonitoringSchedule"}],
230+
}
231+
224232
# For update API
225233
NEW_ROLE_ARN = "arn:aws:iam::012345678902:role/{}".format(ROLE)
226234
NEW_INSTANCE_COUNT = 2
@@ -1716,3 +1724,20 @@ def _test_model_explainability_monitor_delete_schedule(
17161724
sagemaker_session.sagemaker_client.delete_model_explainability_job_definition.assert_called_once_with(
17171725
JobDefinitionName=job_definition_name
17181726
)
1727+
1728+
1729+
def test_model_explainability_monitor_logs_failure(model_explainability_monitor, sagemaker_session):
1730+
sagemaker_session.list_monitoring_executions = MagicMock(
1731+
return_value=MONITORING_EXECUTIONS_EMPTY
1732+
)
1733+
try:
1734+
model_explainability_monitor.get_latest_execution_logs()
1735+
except ValueError as ve:
1736+
assert "No execution jobs were kicked off." in str(ve)
1737+
sagemaker_session.list_monitoring_executions = MagicMock(
1738+
return_value=MONITORING_EXECUTIONS_NO_PROCESSING_JOB
1739+
)
1740+
try:
1741+
model_explainability_monitor.get_latest_execution_logs()
1742+
except ValueError as ve:
1743+
assert "Processing Job did not run for the last execution" in str(ve)

0 commit comments

Comments
 (0)