Skip to content

feat: Support to get latest monitoring execution processing logs #4036

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions src/sagemaker/model_monitor/clarify_model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from sagemaker.session import Session
from sagemaker.utils import name_from_base
from sagemaker.clarify import SageMakerClarifyProcessor, ModelPredictedLabelConfig
from sagemaker.lineage._utils import get_resource_name_from_arn

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -154,6 +155,29 @@ def list_executions(self):
for execution in executions
]

def get_latest_execution_logs(self, wait=False):
"""Get the processing job logs for the most recent monitoring execution

Args:
wait (bool): Whether the call should wait until the job completes (default: False).

Raises:
ValueError: If no execution job or processing job for the last execution has run

Returns: None
"""
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
monitoring_schedule_name=self.monitoring_schedule_name
)
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
raise ValueError("No execution jobs were kicked off.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a "Will Raise" section in docstrings for these

if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
raise ValueError("Processing Job did not run for the last execution")
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
self.sagemaker_session.logs_for_processing_job(
job_name=get_resource_name_from_arn(job_arn), wait=wait
)

def _create_baselining_processor(self):
"""Create and return a SageMakerClarifyProcessor object which will run the baselining job.

Expand Down
24 changes: 24 additions & 0 deletions src/sagemaker/model_monitor/model_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
resolve_value_from_config,
resolve_class_attribute_from_config,
)
from sagemaker.lineage._utils import get_resource_name_from_arn

DEFAULT_REPOSITORY_NAME = "sagemaker-model-monitor-analyzer"

Expand Down Expand Up @@ -768,6 +769,29 @@ def list_executions(self):

return monitoring_executions

def get_latest_execution_logs(self, wait=False):
"""Get the processing job logs for the most recent monitoring execution

Args:
wait (bool): Whether the call should wait until the job completes (default: False).

Raises:
ValueError: If no execution job or processing job for the last execution has run

Returns: None
"""
monitoring_executions = self.sagemaker_session.list_monitoring_executions(
monitoring_schedule_name=self.monitoring_schedule_name
)
if len(monitoring_executions["MonitoringExecutionSummaries"]) == 0:
raise ValueError("No execution jobs were kicked off.")
if "ProcessingJobArn" not in monitoring_executions["MonitoringExecutionSummaries"][0]:
raise ValueError("Processing Job did not run for the last execution")
job_arn = monitoring_executions["MonitoringExecutionSummaries"][0]["ProcessingJobArn"]
Comment on lines +787 to +790
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment

self.sagemaker_session.logs_for_processing_job(
job_name=get_resource_name_from_arn(job_arn), wait=wait
)

def update_monitoring_alert(
self,
monitoring_alert_name: str,
Expand Down
44 changes: 44 additions & 0 deletions tests/integ/test_model_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,38 @@ def test_default_monitor_suggest_baseline_and_create_monitoring_schedule_with_cu
assert len(summary["MonitoringScheduleSummaries"]) > 0


def test_default_monitor_display_logs_errors(sagemaker_session):
my_default_monitor = DefaultModelMonitor(role=ROLE, sagemaker_session=sagemaker_session)

data_captured_destination_s3_uri = os.path.join(
"s3://",
sagemaker_session.default_bucket(),
"sagemaker-serving-batch-transform",
str(uuid.uuid4()),
)

batch_transform_input = BatchTransformInput(
data_captured_destination_s3_uri=data_captured_destination_s3_uri,
destination="/opt/ml/processing/output",
dataset_format=MonitoringDatasetFormat.csv(header=False),
)

my_default_monitor.create_monitoring_schedule(
batch_transform_input=batch_transform_input,
schedule_cron_expression=CronExpressionGenerator.hourly(),
)

_wait_for_schedule_changes_to_apply(monitor=my_default_monitor)

try:
my_default_monitor.get_latest_execution_logs(wait=False)
except ValueError as ve:
assert "No execution jobs were kicked off." in str(ve)

my_default_monitor.stop_monitoring_schedule()
my_default_monitor.delete_monitoring_schedule()


@pytest.mark.skipif(
tests.integ.test_region() in tests.integ.NO_MODEL_MONITORING_REGIONS,
reason="ModelMonitoring is not yet supported in this region.",
Expand Down Expand Up @@ -1643,6 +1675,7 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule
output_kms_key,
updated_volume_kms_key,
updated_output_kms_key,
capfd,
):
baseline_dataset = os.path.join(DATA_DIR, "monitor/baseline_dataset.csv")

Expand Down Expand Up @@ -1771,6 +1804,10 @@ def test_byoc_monitor_attach_followed_by_baseline_and_update_monitoring_schedule

_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)

_check_processing_logs_generated(
monitor=my_attached_monitor, schedule_description=schedule_description, capfd=capfd
)

my_attached_monitor.stop_monitoring_schedule()

_wait_for_schedule_changes_to_apply(monitor=my_attached_monitor)
Expand Down Expand Up @@ -1877,6 +1914,13 @@ def test_default_monitor_monitoring_alerts(sagemaker_session, predictor):
my_default_monitor.delete_monitoring_schedule()


def _check_processing_logs_generated(monitor, schedule_description, capfd):
monitor.get_latest_execution_logs(wait=False)
out, _ = capfd.readouterr()
assert len(out) > 0
assert schedule_description.get("LastMonitoringExecutionSummary")["ProcessingJobArn"] in out


def _wait_for_schedule_changes_to_apply(monitor):
"""Waits for the monitor to no longer be in the 'Pending' state. Updates take under a minute
to apply.
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/sagemaker/monitor/test_clarify_model_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,14 @@
"NetworkConfig": NETWORK_CONFIG._to_request_dict(),
}

MONITORING_EXECUTIONS_EMPTY = {
"MonitoringExecutionSummaries": [],
}

MONITORING_EXECUTIONS_NO_PROCESSING_JOB = {
"MonitoringExecutionSummaries": [{"MonitoringSchedule": "MonitoringSchedule"}],
}

# For update API
NEW_ROLE_ARN = "arn:aws:iam::012345678902:role/{}".format(ROLE)
NEW_INSTANCE_COUNT = 2
Expand Down Expand Up @@ -1716,3 +1724,20 @@ def _test_model_explainability_monitor_delete_schedule(
sagemaker_session.sagemaker_client.delete_model_explainability_job_definition.assert_called_once_with(
JobDefinitionName=job_definition_name
)


def test_model_explainability_monitor_logs_failure(model_explainability_monitor, sagemaker_session):
sagemaker_session.list_monitoring_executions = MagicMock(
return_value=MONITORING_EXECUTIONS_EMPTY
)
try:
model_explainability_monitor.get_latest_execution_logs()
except ValueError as ve:
assert "No execution jobs were kicked off." in str(ve)
sagemaker_session.list_monitoring_executions = MagicMock(
return_value=MONITORING_EXECUTIONS_NO_PROCESSING_JOB
)
try:
model_explainability_monitor.get_latest_execution_logs()
except ValueError as ve:
assert "Processing Job did not run for the last execution" in str(ve)