Skip to content

Commit 623d85b

Browse files
author
Brock Wade
committed
testing
1 parent c8966c7 commit 623d85b

File tree

6 files changed

+145
-183
lines changed

6 files changed

+145
-183
lines changed

tests/data/framework_processor_data/evaluate.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22
Integ test file evaluate.py
33
"""
44

5-
def evaluate():
6-
print("evaluate")
5+
print("test evaluate script")

tests/data/framework_processor_data/preprocess.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22
Integ test file preprocess.py
33
"""
44

5-
def preprocess():
6-
print("preprocess")
5+
print("test preprocess script")

tests/data/framework_processor_data/query_data.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22
Integ test file query_data.py
33
"""
44

5-
def query_data():
6-
print("query data")
5+
print("test query data script")

tests/data/framework_processor_data/train_test_split.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,4 @@
22
Integ test file train_test_split.py
33
"""
44

5-
def train_test_split():
6-
print("train, test, split")
5+
print("test train, test, split script")

tests/integ/sagemaker/workflow/test_processing_steps.py

Lines changed: 2 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,23 @@
1717
import re
1818
import subprocess
1919
from datetime import datetime
20-
from pathlib import Path
2120

2221
import pytest
2322
from botocore.exceptions import WaiterError
2423

2524
from sagemaker import image_uris, get_execution_role, utils
2625
from sagemaker.dataset_definition import DatasetDefinition, AthenaDatasetDefinition
27-
from sagemaker.processing import ProcessingInput, ProcessingOutput, FrameworkProcessor, ScriptProcessor
26+
from sagemaker.processing import ProcessingInput, ProcessingOutput
2827
from sagemaker.s3 import S3Uploader
29-
from sagemaker.sklearn import SKLearnProcessor, SKLearn
28+
from sagemaker.sklearn import SKLearnProcessor
3029
from sagemaker.workflow.parameters import ParameterInteger, ParameterString
3130
from sagemaker.workflow.pipeline import Pipeline
3231
from sagemaker.workflow.steps import (
3332
ProcessingStep,
3433
CacheConfig,
3534
)
36-
from sagemaker.workflow.utilities import hash_files_or_dirs
37-
from sagemaker.workflow.properties import PropertyFile
3835
from sagemaker.spark.processing import PySparkProcessor, SparkJarProcessor
3936
from sagemaker.wrangler.processing import DataWranglerProcessor
40-
from sagemaker.tensorflow import TensorFlow
4137
from tests.integ import DATA_DIR
4238

4339

@@ -383,174 +379,6 @@ def test_one_step_framework_processing_pipeline(
383379
pass
384380

385381

386-
def test_multi_step_framework_processing_pipeline_with_run_method(pipeline_session, role, pipeline_name, region_name):
387-
default_bucket = pipeline_session.default_bucket()
388-
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
389-
evaluation_report = PropertyFile(
390-
name="EvaluationReport", output_name="evaluation", path="evaluation.json"
391-
)
392-
393-
image_uri = image_uris.retrieve(
394-
framework="xgboost",
395-
region=region_name,
396-
version="1.0-1",
397-
py_version="py3",
398-
instance_type="ml.m5.xlarge",
399-
)
400-
401-
query_processor = ScriptProcessor(
402-
command=["python3"],
403-
image_uri=image_uri,
404-
role=role,
405-
instance_count=1,
406-
instance_type="ml.m5.xlarge",
407-
sagemaker_session=pipeline_session,
408-
)
409-
410-
data_processor = FrameworkProcessor(
411-
role=role,
412-
instance_type="ml.m5.xlarge",
413-
instance_count=1,
414-
estimator_cls=TensorFlow,
415-
framework_version="2.9",
416-
py_version="py39",
417-
sagemaker_session=pipeline_session,
418-
)
419-
420-
query_step = ProcessingStep(
421-
name="Query-Data",
422-
step_args=query_processor.run(
423-
code=os.path.join(DATA_DIR, "framework_processor_data/query_data.py"),
424-
arguments=[
425-
"--output-path",
426-
"s3://out1",
427-
"--region",
428-
"s3://out2",
429-
],
430-
),
431-
cache_config=cache_config,
432-
)
433-
434-
input_path = "/opt/ml/processing/input"
435-
output_path = "/opt/ml/processing/output"
436-
437-
prepare_step = ProcessingStep(
438-
name="Prepare-Data",
439-
step_args=data_processor.run(
440-
code="preprocess.py",
441-
source_dir=DATA_DIR + "/framework_processor_data",
442-
inputs=[
443-
ProcessingInput(
444-
input_name="task_preprocess_input",
445-
source=query_step.properties.ProcessingOutputConfig.Outputs["task_query_output"].S3Output.S3Uri,
446-
destination=input_path,
447-
)
448-
],
449-
arguments=[
450-
"--input-path",
451-
input_path,
452-
"--output-path",
453-
output_path,
454-
],
455-
),
456-
cache_config=cache_config,
457-
)
458-
459-
split_step = ProcessingStep(
460-
name="Split-Data",
461-
step_args=data_processor.run(
462-
code="train_test_split.py",
463-
source_dir=DATA_DIR + "/framework_processor_data",
464-
inputs=[
465-
ProcessingInput(
466-
source=prepare_step.properties.ProcessingOutputConfig.Outputs[
467-
"task_preprocess_output"
468-
].S3Output.S3Uri,
469-
destination=input_path,
470-
),
471-
],
472-
arguments=["--input-path", input_path, "--output-path", output_path],
473-
),
474-
cache_config=cache_config,
475-
)
476-
477-
sk_processor = FrameworkProcessor(
478-
framework_version="1.0-1",
479-
instance_type="ml.m5.xlarge",
480-
instance_count=1,
481-
base_job_name="my-job",
482-
role=role,
483-
estimator_cls=SKLearn,
484-
sagemaker_session=pipeline_session,
485-
)
486-
487-
evaluate_step = ProcessingStep(
488-
name="Evaluate-Model",
489-
step_args=sk_processor.run(
490-
code="evaluate.py",
491-
source_dir=DATA_DIR + "/framework_processor_data",
492-
outputs=[
493-
ProcessingOutput(
494-
output_name="evaluation",
495-
source="/opt/ml/processing/evaluation",
496-
),
497-
],
498-
),
499-
property_files=[evaluation_report],
500-
cache_config=cache_config,
501-
)
502-
503-
pipeline = Pipeline(
504-
name=pipeline_name,
505-
steps=[query_step, prepare_step, split_step, evaluate_step]
506-
)
507-
try:
508-
509-
pipeline.create(role)
510-
511-
definition = json.loads(pipeline.definition())
512-
513-
execution = pipeline.start(parameters={})
514-
assert re.match(
515-
rf"arn:aws:sagemaker:{region_name}:\d{{12}}:pipeline/{pipeline.name}/execution/",
516-
execution.arn,
517-
)
518-
519-
try:
520-
execution.wait(delay=60, max_attempts=3)
521-
except WaiterError as test:
522-
print(test)
523-
pass
524-
execution_steps = execution.list_steps()
525-
print("Execution Steps: ", execution_steps)
526-
assert len(execution_steps) == 4
527-
528-
definition = json.loads(pipeline.definition())
529-
530-
source_dir_tar_prefix = f"s3://{default_bucket}/{pipeline.name}" \
531-
f"/code/{hash_files_or_dirs([DATA_DIR + '/framework_processor_data'])}"
532-
533-
run_procs = []
534-
535-
for step in definition["Steps"]:
536-
for input_obj in step["Arguments"]["ProcessingInputs"]:
537-
if input_obj["InputName"] == "entrypoint":
538-
s3_uri = input_obj["S3Input"]["S3Uri"]
539-
run_procs.append(s3_uri)
540-
541-
# verify runproc.sh prefix is different from code artifact prefix
542-
assert Path(s3_uri).parent != source_dir_tar_prefix
543-
544-
# verify all the run_proc.sh artifact paths are distinct
545-
assert len(run_procs) == len(set(run_procs))
546-
547-
finally:
548-
try:
549-
pipeline.delete()
550-
except Exception:
551-
pass
552-
553-
554382
def test_one_step_pyspark_processing_pipeline(
555383
sagemaker_session,
556384
role,

0 commit comments

Comments
 (0)