Skip to content

Commit 54dd242

Browse files
author
Brock Wade
committed
fix: more testing
1 parent 59d9b42 commit 54dd242

File tree

2 files changed

+175
-153
lines changed

2 files changed

+175
-153
lines changed

tests/integ/sagemaker/workflow/test_processing_steps.py

Lines changed: 174 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,27 @@
1717
import re
1818
import subprocess
1919
from datetime import datetime
20+
from pathlib import Path
2021

2122
import pytest
2223
from botocore.exceptions import WaiterError
2324

2425
from sagemaker import image_uris, get_execution_role, utils
2526
from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition
26-
from sagemaker.processing import ProcessingInput, ProcessingOutput
27+
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor, ScriptProcessor
2728
from sagemaker.s3 import S3Uploader
28-
from sagemaker.sklearn import SKLearnProcessor
29+
from sagemaker.sklearn import SKLearnProcessor, SKLearn
2930
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
3031
from sagemaker.workflow.pipeline import Pipeline
3132
from sagemaker.workflow.steps import (
3233
ProcessingStep,
3334
CacheConfig,
3435
)
36+
from sagemaker.workflow.utilities import hash_files_or_dirs
37+
from sagemaker.workflow.properties import PropertyFile
3538
from sagemaker.spark.processing import PySparkProcessor, SparkJarProcessor
3639
from sagemaker.wrangler.processing import DataWranglerProcessor
40+
from sagemaker.tensorflow import TensorFlow
3741
from tests.integ import DATA_DIR
3842

3943

@@ -379,6 +383,174 @@ def test_one_step_framework_processing_pipeline(
379383
pass
380384

381385

386+
def test_multi_step_framework_processing_pipeline_with_run_method(pipeline_session, role, pipeline_name, region_name):
387+
default_bucket = pipeline_session.default_bucket()
388+
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
389+
evaluation_report = PropertyFile(
390+
name="EvaluationReport", output_name="evaluation", path="evaluation.json"
391+
)
392+
393+
image_uri = image_uris.retrieve(
394+
framework="xgboost",
395+
region=region_name,
396+
version="1.0-1",
397+
py_version="py3",
398+
instance_type="ml.m5.xlarge",
399+
)
400+
401+
query_processor = ScriptProcessor(
402+
command=["python3"],
403+
image_uri=image_uri,
404+
role=role,
405+
instance_count=1,
406+
instance_type="ml.m5.xlarge",
407+
sagemaker_session=pipeline_session,
408+
)
409+
410+
data_processor = FrameworkProcessor(
411+
role=role,
412+
instance_type="ml.m5.xlarge",
413+
instance_count=1,
414+
estimator_cls=TensorFlow,
415+
framework_version="2.9",
416+
py_version="py39",
417+
sagemaker_session=pipeline_session,
418+
)
419+
420+
query_step = ProcessingStep(
421+
name="Query-Data",
422+
step_args=query_processor.run(
423+
code=os.path.join(DATA_DIR, "framework_processor_data/query_data.py"),
424+
arguments=[
425+
"--output-path",
426+
"s3://out1",
427+
"--region",
428+
"s3://out2",
429+
],
430+
),
431+
cache_config=cache_config,
432+
)
433+
434+
input_path = "/opt/ml/processing/input"
435+
output_path = "/opt/ml/processing/output"
436+
437+
prepare_step = ProcessingStep(
438+
name="Prepare-Data",
439+
step_args=data_processor.run(
440+
code="preprocess.py",
441+
source_dir=DATA_DIR + "/framework_processor_data",
442+
inputs=[
443+
ProcessingInput(
444+
input_name="task_preprocess_input",
445+
source=query_step.properties.ProcessingOutputConfig.Outputs["task_query_output"].S3Output.S3Uri,
446+
destination=input_path,
447+
)
448+
],
449+
arguments=[
450+
"--input-path",
451+
input_path,
452+
"--output-path",
453+
output_path,
454+
],
455+
),
456+
cache_config=cache_config,
457+
)
458+
459+
split_step = ProcessingStep(
460+
name="Split-Data",
461+
step_args=data_processor.run(
462+
code="train_test_split.py",
463+
source_dir=DATA_DIR + "/framework_processor_data",
464+
inputs=[
465+
ProcessingInput(
466+
source=prepare_step.properties.ProcessingOutputConfig.Outputs[
467+
"task_preprocess_output"
468+
].S3Output.S3Uri,
469+
destination=input_path,
470+
),
471+
],
472+
arguments=["--input-path", input_path, "--output-path", output_path],
473+
),
474+
cache_config=cache_config,
475+
)
476+
477+
sk_processor = FrameworkProcessor(
478+
framework_version="1.0-1",
479+
instance_type="ml.m5.xlarge",
480+
instance_count=1,
481+
base_job_name="my-job",
482+
role=role,
483+
estimator_cls=SKLearn,
484+
sagemaker_session=pipeline_session,
485+
)
486+
487+
evaluate_step = ProcessingStep(
488+
name="Evaluate-Model",
489+
step_args=sk_processor.run(
490+
code="evaluate.py",
491+
source_dir=DATA_DIR + "/framework_processor_data",
492+
outputs=[
493+
ProcessingOutput(
494+
output_name="evaluation",
495+
source="/opt/ml/processing/evaluation",
496+
),
497+
],
498+
),
499+
property_files=[evaluation_report],
500+
cache_config=cache_config,
501+
)
502+
503+
pipeline = Pipeline(
504+
name=pipeline_name,
505+
steps=[query_step, prepare_step, split_step, evaluate_step]
506+
)
507+
try:
508+
509+
pipeline.create(role)
510+
511+
definition = json.loads(pipeline.definition())
512+
513+
execution = pipeline.start(parameters={})
514+
assert re.match(
515+
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline.name}/execution/",
516+
execution.arn,
517+
)
518+
519+
try:
520+
execution.wait(delay=60, max_attempts=3)
521+
except WaiterError as test:
522+
print(test)
523+
pass
524+
execution_steps = execution.list_steps()
525+
print("Execution Steps: ", execution_steps)
526+
assert len(execution_steps) == 4
527+
528+
definition = json.loads(pipeline.definition())
529+
530+
source_dir_tar_prefix = f"s3://{default_bucket}/{pipeline.name}" \
531+
f"/code/{hash_files_or_dirs([DATA_DIR + '/framework_processor_data'])}"
532+
533+
run_procs = []
534+
535+
for step in definition["Steps"]:
536+
for input_obj in step["Arguments"]["ProcessingInputs"]:
537+
if input_obj["InputName"] == "entrypoint":
538+
s3_uri = input_obj["S3Input"]["S3Uri"]
539+
run_procs.append(s3_uri)
540+
541+
# verify runproc.sh prefix is different from code artifact prefix
542+
assert Path(s3_uri).parent != source_dir_tar_prefix
543+
544+
# verify all the run_proc.sh artifact paths are distinct
545+
assert len(run_procs) == len(set(run_procs))
546+
547+
finally:
548+
try:
549+
pipeline.delete()
550+
except Exception:
551+
pass
552+
553+
382554
def test_one_step_pyspark_processing_pipeline(
383555
sagemaker_session,
384556
role,

tests/integ/sagemaker/workflow/test_workflow.py

Lines changed: 1 addition & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
import time
2020
import shutil
2121

22-
from pathlib import Path
2322
from contextlib import contextmanager
2423
import pytest
2524

2625
from botocore.exceptions import WaiterError
2726
import pandas as pd
28-
from sagemaker.network import NetworkConfig
29-
from sagemaker.tensorflow import TensorFlow
3027

3128
from tests.integ.s3_utils import extract_files_from_s3
3229
from sagemaker.workflow.model_step import (
@@ -49,8 +46,7 @@
4946
ProcessingInput,
5047
ProcessingOutput,
5148
FeatureStoreOutput,
52-
ScriptProcessor,
53-
FrameworkProcessor
49+
ScriptProcessor
5450
)
5551
from sagemaker.s3 import S3Uploader
5652
from sagemaker.session import get_execution_role
@@ -1315,149 +1311,3 @@ def test_caching_behavior(
13151311
except Exception:
13161312
os.remove(script_dir + "/dummy_script.py")
13171313
pass
1318-
1319-
def test_processing_steps_with_framework_processor(pipeline_session, role):
1320-
default_bucket = pipeline_session.default_bucket()
1321-
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
1322-
evaluation_report = PropertyFile(
1323-
name="EvaluationReport", output_name="evaluation", path="evaluation.json"
1324-
)
1325-
query_processor = ScriptProcessor(
1326-
command=["python3"],
1327-
image_uri="my-img",
1328-
role=role,
1329-
instance_count=1,
1330-
instance_type="ml.m5.xlarge",
1331-
network_config=NetworkConfig(
1332-
enable_network_isolation=False,
1333-
# VPC-Prod
1334-
subnets=["subnet-something"],
1335-
security_group_ids=["sg-something"],
1336-
),
1337-
sagemaker_session=pipeline_session,
1338-
)
1339-
1340-
data_processor = FrameworkProcessor(
1341-
role=role,
1342-
instance_type="ml.m5.xlarge",
1343-
instance_count=1,
1344-
estimator_cls=TensorFlow,
1345-
framework_version="2.9",
1346-
py_version="py39",
1347-
sagemaker_session=pipeline_session,
1348-
)
1349-
1350-
query_step = ProcessingStep(
1351-
name="Query-Data",
1352-
step_args=query_processor.run(
1353-
code=os.path.join(DATA_DIR, "framework_processor_data/query_data.py"),
1354-
arguments=[
1355-
"--output-path",
1356-
"s3://out1",
1357-
"--region",
1358-
"s3://out2",
1359-
],
1360-
),
1361-
cache_config=cache_config,
1362-
)
1363-
1364-
input_path = "/opt/ml/processing/input"
1365-
output_path = "/opt/ml/processing/output"
1366-
1367-
prepare_step = ProcessingStep(
1368-
name="Prepare-Data",
1369-
step_args=data_processor.run(
1370-
code="preprocess.py",
1371-
source_dir=DATA_DIR + "/framework_processor_data",
1372-
inputs=[
1373-
ProcessingInput(
1374-
input_name="task_preprocess_input",
1375-
source=query_step.properties.ProcessingOutputConfig.Outputs["task_query_output"].S3Output.S3Uri,
1376-
destination=input_path,
1377-
)
1378-
],
1379-
arguments=[
1380-
"--input-path",
1381-
input_path,
1382-
"--output-path",
1383-
output_path,
1384-
],
1385-
),
1386-
cache_config=cache_config,
1387-
)
1388-
1389-
split_step = ProcessingStep(
1390-
name="Split-Data",
1391-
step_args=data_processor.run(
1392-
code="train_test_split.py",
1393-
source_dir=DATA_DIR + "/framework_processor_data",
1394-
inputs=[
1395-
ProcessingInput(
1396-
source=prepare_step.properties.ProcessingOutputConfig.Outputs[
1397-
"task_preprocess_output"
1398-
].S3Output.S3Uri,
1399-
destination=input_path,
1400-
),
1401-
],
1402-
arguments=["--input-path", input_path, "--output-path", output_path],
1403-
),
1404-
cache_config=cache_config,
1405-
)
1406-
1407-
sk_processor = FrameworkProcessor(
1408-
framework_version="1.0-1",
1409-
instance_type="ml.m5.xlarge",
1410-
instance_count=1,
1411-
base_job_name="my-job",
1412-
role=role,
1413-
estimator_cls=SKLearn,
1414-
sagemaker_session=pipeline_session,
1415-
)
1416-
1417-
evaluate_step = ProcessingStep(
1418-
name="Evaluate-Model",
1419-
step_args=sk_processor.run(
1420-
code="evaluate.py",
1421-
source_dir=DATA_DIR + "/framework_processor_data",
1422-
outputs=[
1423-
ProcessingOutput(
1424-
output_name="evaluation",
1425-
source="/opt/ml/processing/evaluation",
1426-
),
1427-
],
1428-
),
1429-
property_files=[evaluation_report],
1430-
cache_config=cache_config,
1431-
)
1432-
1433-
pipeline = Pipeline(
1434-
name="test-fw-proc-steps-pipeline",
1435-
steps=[query_step, prepare_step, split_step, evaluate_step]
1436-
)
1437-
try:
1438-
# create pipeline
1439-
pipeline.create(role)
1440-
definition = json.loads(pipeline.definition())
1441-
1442-
source_dir_tar_prefix = f"s3://{default_bucket}/{pipeline.name}" \
1443-
f"/code/{hash_files_or_dirs([DATA_DIR + '/framework_processor_data'])}"
1444-
1445-
run_procs = []
1446-
1447-
for step in definition["Steps"]:
1448-
for input_obj in step["Arguments"]["ProcessingInputs"]:
1449-
if input_obj["InputName"] == "entrypoint":
1450-
s3_uri = input_obj["S3Input"]["S3Uri"]
1451-
run_procs.append(s3_uri)
1452-
1453-
# verify runproc.sh prefix is different from code artifact prefix
1454-
assert Path(s3_uri).parent != source_dir_tar_prefix
1455-
1456-
# verify all the run_proc.sh artifact paths are distinct
1457-
assert len(run_procs) == len(set(run_procs))
1458-
1459-
finally:
1460-
try:
1461-
pipeline.delete()
1462-
except Exception:
1463-
pass

0 commit comments

Comments
 (0)