Skip to content

Commit e8deeb3

Browse files
rohangujarathiRohan Gujarathimetrizable
authored
change: include workflow integ tests with clarify and debugger enabled (#2024)
* change: include workflow integ tests with clarify and debugger enabled * avoid use of os.path.join for s3 paths Co-authored-by: Eric Johnson <[email protected]> * remove hardcoded pytorch framework and py_version * Don't override analysis config * Update output_path Co-authored-by: Eric Johnson <[email protected]> * Simplify assert statements Co-authored-by: Eric Johnson <[email protected]> * correct black failure * Update tests/integ/test_workflow_with_clarify.py Co-authored-by: Eric Johnson <[email protected]> Co-authored-by: Rohan Gujarathi <[email protected]> Co-authored-by: Eric Johnson <[email protected]>
1 parent 2cc9bc3 commit e8deeb3

File tree

2 files changed

+381
-0
lines changed

2 files changed

+381
-0
lines changed

tests/integ/test_workflow.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,18 @@
1616
import os
1717
import re
1818
import time
19+
import uuid
1920

2021
import boto3
2122
import pytest
2223

2324
from botocore.config import Config
2425
from botocore.exceptions import WaiterError
26+
from sagemaker.debugger import (
27+
DebuggerHookConfig,
28+
Rule,
29+
rule_configs,
30+
)
2531
from sagemaker.inputs import CreateModelInput, TrainingInput
2632
from sagemaker.model import Model
2733
from sagemaker.processing import ProcessingInput, ProcessingOutput
@@ -401,3 +407,93 @@ def test_conditional_pytorch_training_model_registration(
401407
pipeline.delete()
402408
except Exception:
403409
pass
410+
411+
412+
def test_training_job_with_debugger(
413+
sagemaker_session,
414+
pipeline_name,
415+
role,
416+
pytorch_training_latest_version,
417+
pytorch_training_latest_py_version,
418+
):
419+
instance_count = ParameterInteger(name="InstanceCount", default_value=1)
420+
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge")
421+
422+
rules = [
423+
Rule.sagemaker(rule_configs.vanishing_gradient()),
424+
Rule.sagemaker(base_config=rule_configs.all_zero(), rule_parameters={"tensor_regex": ".*"}),
425+
Rule.sagemaker(rule_configs.loss_not_decreasing()),
426+
]
427+
debugger_hook_config = DebuggerHookConfig(
428+
s3_output_path=f"s3://{sagemaker_session.default_bucket()}/{uuid.uuid4()}/tensors"
429+
)
430+
431+
base_dir = os.path.join(DATA_DIR, "pytorch_mnist")
432+
script_path = os.path.join(base_dir, "mnist.py")
433+
input_path = sagemaker_session.upload_data(
434+
path=os.path.join(base_dir, "training"),
435+
key_prefix="integ-test-data/pytorch_mnist/training",
436+
)
437+
inputs = TrainingInput(s3_data=input_path)
438+
439+
pytorch_estimator = PyTorch(
440+
entry_point=script_path,
441+
role="SageMakerRole",
442+
framework_version=pytorch_training_latest_version,
443+
py_version=pytorch_training_latest_py_version,
444+
instance_count=instance_count,
445+
instance_type=instance_type,
446+
sagemaker_session=sagemaker_session,
447+
rules=rules,
448+
debugger_hook_config=debugger_hook_config,
449+
)
450+
451+
step_train = TrainingStep(
452+
name="pytorch-train",
453+
estimator=pytorch_estimator,
454+
inputs=inputs,
455+
)
456+
457+
pipeline = Pipeline(
458+
name=pipeline_name,
459+
parameters=[instance_count, instance_type],
460+
steps=[step_train],
461+
sagemaker_session=sagemaker_session,
462+
)
463+
464+
try:
465+
response = pipeline.create(role)
466+
create_arn = response["PipelineArn"]
467+
468+
execution = pipeline.start()
469+
response = execution.describe()
470+
assert response["PipelineArn"] == create_arn
471+
472+
try:
473+
execution.wait(delay=10, max_attempts=60)
474+
except WaiterError:
475+
pass
476+
execution_steps = execution.list_steps()
477+
training_job_arn = execution_steps[0]["Metadata"]["TrainingJob"]["Arn"]
478+
job_description = sagemaker_session.sagemaker_client.describe_training_job(
479+
TrainingJobName=training_job_arn.split("/")[1]
480+
)
481+
482+
assert len(execution_steps) == 1
483+
assert execution_steps[0]["StepName"] == "pytorch-train"
484+
assert execution_steps[0]["StepStatus"] == "Succeeded"
485+
486+
for index, rule in enumerate(rules):
487+
config = job_description["DebugRuleConfigurations"][index]
488+
assert config["RuleConfigurationName"] == rule.name
489+
assert config["RuleEvaluatorImage"] == rule.image_uri
490+
assert config["VolumeSizeInGB"] == 0
491+
assert (
492+
config["RuleParameters"]["rule_to_invoke"] == rule.rule_parameters["rule_to_invoke"]
493+
)
494+
assert job_description["DebugHookConfig"] == debugger_hook_config._to_request_dict()
495+
finally:
496+
try:
497+
pipeline.delete()
498+
except Exception:
499+
pass
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License"). You
4+
# may not use this file except in compliance with the License. A copy of
5+
# the License is located at
6+
#
7+
# http://aws.amazon.com/apache2.0/
8+
#
9+
# or in the "license" file accompanying this file. This file is
10+
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
11+
# ANY KIND, either express or implied. See the License for the specific
12+
# language governing permissions and limitations under the License.
13+
from __future__ import absolute_import
14+
15+
import json
16+
import os
17+
import statistics
18+
import time
19+
import tempfile
20+
21+
import pytest
22+
import numpy as np
23+
import pandas as pd
24+
from botocore.exceptions import WaiterError
25+
from sagemaker.amazon.linear_learner import LinearLearner, LinearLearnerPredictor
26+
from sagemaker.clarify import (
27+
BiasConfig,
28+
DataConfig,
29+
ModelConfig,
30+
ModelPredictedLabelConfig,
31+
SageMakerClarifyProcessor,
32+
)
33+
from sagemaker.processing import ProcessingInput, ProcessingOutput
34+
from sagemaker.session import get_execution_role
35+
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
36+
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
37+
from sagemaker.workflow.parameters import (
38+
ParameterInteger,
39+
ParameterString,
40+
)
41+
from sagemaker.workflow.steps import ProcessingStep
42+
from sagemaker.workflow.properties import PropertyFile
43+
from sagemaker.workflow.pipeline import Pipeline
44+
45+
from sagemaker import utils
46+
from tests import integ
47+
from tests.integ import timeout
48+
49+
50+
@pytest.fixture(scope="module")
51+
def role(sagemaker_session):
52+
return get_execution_role(sagemaker_session)
53+
54+
55+
@pytest.fixture
56+
def pipeline_name():
57+
return f"my-pipeline-clarify-{int(time.time() * 10**7)}"
58+
59+
60+
@pytest.fixture(scope="module")
61+
def training_set():
62+
label = (np.random.rand(100, 1) > 0.5).astype(np.int32)
63+
features = np.random.rand(100, 4)
64+
return features, label
65+
66+
67+
@pytest.yield_fixture(scope="module")
68+
def data_path(training_set):
69+
features, label = training_set
70+
data = pd.concat([pd.DataFrame(label), pd.DataFrame(features)], axis=1, sort=False)
71+
with tempfile.TemporaryDirectory() as tmpdirname:
72+
filename = os.path.join(tmpdirname, "train.csv")
73+
data.to_csv(filename, index=False, header=False)
74+
yield filename
75+
76+
77+
@pytest.fixture(scope="module")
78+
def headers():
79+
return [
80+
"Label",
81+
"F1",
82+
"F2",
83+
"F3",
84+
"F4",
85+
]
86+
87+
88+
@pytest.fixture(scope="module")
89+
def data_config(sagemaker_session, data_path, headers):
90+
output_path = f"s3://{sagemaker_session.default_bucket()}/linear_learner_analysis_result"
91+
return DataConfig(
92+
s3_data_input_path=data_path,
93+
s3_output_path=output_path,
94+
label="Label",
95+
headers=headers,
96+
dataset_type="text/csv",
97+
)
98+
99+
100+
@pytest.fixture(scope="module")
101+
def data_bias_config():
102+
return BiasConfig(
103+
label_values_or_threshold=[1],
104+
facet_name="F1",
105+
facet_values_or_threshold=[0.5],
106+
group_name="F2",
107+
)
108+
109+
110+
@pytest.yield_fixture(scope="module")
111+
def model_name(sagemaker_session, cpu_instance_type, training_set):
112+
job_name = utils.unique_name_from_base("clarify-xgb")
113+
114+
with timeout.timeout(minutes=integ.TRAINING_DEFAULT_TIMEOUT_MINUTES):
115+
ll = LinearLearner(
116+
"SageMakerRole",
117+
1,
118+
cpu_instance_type,
119+
predictor_type="binary_classifier",
120+
sagemaker_session=sagemaker_session,
121+
disable_profiler=True,
122+
)
123+
ll.binary_classifier_model_selection_criteria = "accuracy"
124+
ll.early_stopping_tolerance = 0.0001
125+
ll.early_stopping_patience = 3
126+
ll.num_models = 1
127+
ll.epochs = 1
128+
ll.num_calibration_samples = 1
129+
130+
features, label = training_set
131+
ll.fit(
132+
ll.record_set(features.astype(np.float32), label.reshape(-1).astype(np.float32)),
133+
job_name=job_name,
134+
)
135+
136+
with timeout.timeout_and_delete_endpoint_by_name(job_name, sagemaker_session):
137+
ll.deploy(1, cpu_instance_type, endpoint_name=job_name, model_name=job_name, wait=True)
138+
yield job_name
139+
140+
141+
@pytest.fixture(scope="module")
142+
def model_config(model_name):
143+
return ModelConfig(
144+
model_name=model_name,
145+
instance_type="ml.c5.xlarge",
146+
instance_count=1,
147+
accept_type="application/jsonlines",
148+
)
149+
150+
151+
@pytest.fixture(scope="module")
152+
def model_predicted_label_config(sagemaker_session, model_name, training_set):
153+
predictor = LinearLearnerPredictor(
154+
model_name,
155+
sagemaker_session=sagemaker_session,
156+
)
157+
result = predictor.predict(training_set[0].astype(np.float32))
158+
predictions = [float(record.label["score"].float32_tensor.values[0]) for record in result]
159+
probability_threshold = statistics.median(predictions)
160+
return ModelPredictedLabelConfig(label="score", probability_threshold=probability_threshold)
161+
162+
163+
def test_workflow_with_clarify(
164+
data_config,
165+
data_bias_config,
166+
model_config,
167+
model_predicted_label_config,
168+
pipeline_name,
169+
role,
170+
sagemaker_session,
171+
):
172+
173+
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge")
174+
instance_count = ParameterInteger(name="InstanceCount", default_value=1)
175+
176+
analysis_config = data_config.get_config()
177+
analysis_config.update(data_bias_config.get_config())
178+
(
179+
probability_threshold,
180+
predictor_config,
181+
) = model_predicted_label_config.get_predictor_config()
182+
predictor_config.update(model_config.get_predictor_config())
183+
analysis_config["methods"] = {"post_training_bias": {"methods": "all"}}
184+
analysis_config["predictor"] = predictor_config
185+
analysis_config["probability_threshold"] = probability_threshold
186+
analysis_config["methods"]["report"] = {"name": "report", "title": "Analysis Report"}
187+
188+
with tempfile.TemporaryDirectory() as tmpdirname:
189+
analysis_config_file = os.path.join(tmpdirname, "analysis_config.json")
190+
with open(analysis_config_file, "w") as f:
191+
json.dump(analysis_config, f)
192+
config_input = ProcessingInput(
193+
input_name="analysis_config",
194+
source=analysis_config_file,
195+
destination="/opt/ml/processing/input/config",
196+
s3_data_type="S3Prefix",
197+
s3_input_mode="File",
198+
s3_compression_type="None",
199+
)
200+
201+
data_input = ProcessingInput(
202+
input_name="dataset",
203+
source=data_config.s3_data_input_path,
204+
destination="/opt/ml/processing/input/data",
205+
s3_data_type="S3Prefix",
206+
s3_input_mode="File",
207+
s3_data_distribution_type=data_config.s3_data_distribution_type,
208+
s3_compression_type=data_config.s3_compression_type,
209+
)
210+
211+
result_output = ProcessingOutput(
212+
source="/opt/ml/processing/output",
213+
destination=data_config.s3_output_path,
214+
output_name="analysis_result",
215+
s3_upload_mode="EndOfJob",
216+
)
217+
218+
processor = SageMakerClarifyProcessor(
219+
role="SageMakerRole",
220+
instance_count=instance_count,
221+
instance_type=instance_type,
222+
sagemaker_session=sagemaker_session,
223+
)
224+
225+
property_file = PropertyFile(
226+
name="BiasOutput",
227+
output_name="analysis_result",
228+
path="analysis.json",
229+
)
230+
231+
step_process = ProcessingStep(
232+
name="my-process",
233+
processor=processor,
234+
inputs=[data_input, config_input],
235+
outputs=[result_output],
236+
property_files=[property_file],
237+
)
238+
239+
cond_left = JsonGet(
240+
step=step_process,
241+
property_file="BiasOutput",
242+
json_path="post_training_bias_metrics.facets.F1[0].metrics[0].value",
243+
)
244+
245+
step_condition = ConditionStep(
246+
name="bias-condition",
247+
conditions=[ConditionLessThanOrEqualTo(left=cond_left, right=1)],
248+
if_steps=[],
249+
else_steps=[],
250+
)
251+
252+
pipeline = Pipeline(
253+
name=pipeline_name,
254+
parameters=[instance_type, instance_count],
255+
steps=[step_process, step_condition],
256+
sagemaker_session=sagemaker_session,
257+
)
258+
259+
try:
260+
response = pipeline.create(role)
261+
create_arn = response["PipelineArn"]
262+
263+
execution = pipeline.start(parameters={})
264+
265+
response = execution.describe()
266+
assert response["PipelineArn"] == create_arn
267+
268+
try:
269+
execution.wait(delay=30, max_attempts=60)
270+
except WaiterError:
271+
pass
272+
execution_steps = execution.list_steps()
273+
274+
assert len(execution_steps) == 2
275+
assert execution_steps[1]["StepName"] == "my-process"
276+
assert execution_steps[1]["StepStatus"] == "Succeeded"
277+
assert execution_steps[0]["StepName"] == "bias-condition"
278+
assert execution_steps[0]["StepStatus"] == "Succeeded"
279+
assert execution_steps[0]["Metadata"]["Condition"]["Outcome"] == "True"
280+
281+
finally:
282+
try:
283+
pipeline.delete()
284+
except Exception:
285+
pass

0 commit comments

Comments
 (0)