Skip to content

feature: support JsonGet/Join parameterization in tuning step Hyperparameters #2833

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions src/sagemaker/parameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import json
from sagemaker.workflow.parameters import Parameter as PipelineParameter
from sagemaker.workflow.functions import JsonGet as PipelineJsonGet
from sagemaker.workflow.functions import Join as PipelineJoin


class ParameterRange(object):
Expand Down Expand Up @@ -71,10 +73,10 @@ def as_tuning_range(self, name):
return {
"Name": name,
"MinValue": str(self.min_value)
if not isinstance(self.min_value, PipelineParameter)
if not isinstance(self.min_value, (PipelineParameter, PipelineJsonGet, PipelineJoin))
else self.min_value,
"MaxValue": str(self.max_value)
if not isinstance(self.max_value, PipelineParameter)
if not isinstance(self.max_value, (PipelineParameter, PipelineJsonGet, PipelineJoin))
else self.max_value,
"ScalingType": self.scaling_type,
}
Expand Down Expand Up @@ -108,10 +110,11 @@ def __init__(self, values): # pylint: disable=super-init-not-called
values (list or object): The possible values for the hyperparameter.
This input will be converted into a list of strings.
"""
if isinstance(values, list):
self.values = [str(v) if not isinstance(v, PipelineParameter) else v for v in values]
else:
self.values = [str(values) if not isinstance(values, PipelineParameter) else values]
values = values if isinstance(values, list) else [values]
self.values = [
str(v) if not isinstance(v, (PipelineParameter, PipelineJsonGet, PipelineJoin)) else v
for v in values
]

def as_tuning_range(self, name):
"""Represent the parameter range as a dictionary.
Expand Down
23 changes: 22 additions & 1 deletion src/sagemaker/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@
IntegerParameter,
ParameterRange,
)
from sagemaker.workflow.parameters import Parameter as PipelineParameter
from sagemaker.workflow.functions import JsonGet as PipelineJsonGet
from sagemaker.workflow.functions import Join as PipelineJoin

from sagemaker.session import Session
from sagemaker.utils import base_from_name, base_name_from_image, name_from_base

Expand All @@ -59,6 +63,18 @@
logger = logging.getLogger(__name__)


def is_pipeline_parameters(value):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This seems to be used at multiple places, can we create a utility around this

"""Determine if a value is a pipeline parameter or function representation

Args:
value (float or int): The value to be verified.

Returns:
bool: True if it is, False otherwise.
"""
return isinstance(value, (PipelineParameter, PipelineJsonGet, PipelineJoin))


class WarmStartTypes(Enum):
"""Warm Start Configuration type.

Expand Down Expand Up @@ -359,7 +375,12 @@ def _prepare_static_hyperparameters(
):
"""Prepare static hyperparameters for one estimator before tuning."""
# Remove any hyperparameter that will be tuned
static_hyperparameters = {str(k): str(v) for (k, v) in estimator.hyperparameters().items()}
static_hyperparameters = {
str(k): str(v)
if not isinstance(v, (PipelineParameter, PipelineJsonGet, PipelineJoin))
else v
for (k, v) in estimator.hyperparameters().items()
}
for hyperparameter_name in hyperparameter_ranges.keys():
static_hyperparameters.pop(hyperparameter_name, None)

Expand Down
52 changes: 46 additions & 6 deletions tests/integ/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,13 @@
ConditionIn,
ConditionLessThanOrEqualTo,
)
from sagemaker.workflow.condition_step import ConditionStep, JsonGet
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum
from sagemaker.workflow.lambda_step import LambdaStep, LambdaOutput, LambdaOutputTypeEnum
from sagemaker.workflow.properties import PropertyFile
from sagemaker.wrangler.processing import DataWranglerProcessor
from sagemaker.dataset_definition.inputs import DatasetDefinition, AthenaDatasetDefinition
from sagemaker.workflow.execution_variables import ExecutionVariables
from sagemaker.workflow.functions import Join
from sagemaker.workflow.functions import Join, JsonGet
from sagemaker.wrangler.ingestion import generate_data_ingestion_flow_from_s3_input
from sagemaker.workflow.parameters import (
ParameterInteger,
Expand All @@ -87,6 +86,7 @@
TuningStep,
TransformStep,
TransformInput,
PropertyFile,
)
from sagemaker.workflow.step_collections import RegisterModel
from sagemaker.workflow.pipeline import Pipeline
Expand Down Expand Up @@ -137,7 +137,7 @@ def feature_store_session(sagemaker_session):

@pytest.fixture
def pipeline_name():
return f"my-pipeline-{int(time.time() * 10**7)}"
return f"my-pipeline-{int(time.time() * 10 ** 7)}"


@pytest.fixture
Expand Down Expand Up @@ -1371,6 +1371,8 @@ def test_tuning_multi_algos(
cpu_instance_type,
pipeline_name,
region_name,
script_dir,
athena_dataset_definition,
):
base_dir = os.path.join(DATA_DIR, "pytorch_mnist")
entry_point = os.path.join(base_dir, "mnist.py")
Expand All @@ -1382,6 +1384,42 @@ def test_tuning_multi_algos(
instance_count = ParameterInteger(name="InstanceCount", default_value=1)
instance_type = ParameterString(name="InstanceType", default_value="ml.m5.xlarge")

input_data = f"s3://sagemaker-sample-data-{region_name}/processing/census/census-income.csv"

sklearn_processor = SKLearnProcessor(
framework_version="0.20.0",
instance_type=instance_type,
instance_count=instance_count,
base_job_name="test-sklearn",
sagemaker_session=sagemaker_session,
role=role,
)

property_file = PropertyFile(
name="DataAttributes", output_name="attributes", path="attributes.json"
)

step_process = ProcessingStep(
name="my-process",
display_name="ProcessingStep",
description="description for Processing step",
processor=sklearn_processor,
inputs=[
ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
ProcessingInput(dataset_definition=athena_dataset_definition),
],
outputs=[
ProcessingOutput(output_name="train_data", source="/opt/ml/processing/train"),
ProcessingOutput(output_name="attributes", source="/opt/ml/processing/attributes.json"),
],
property_files=[property_file],
code=os.path.join(script_dir, "preprocessing.py"),
)

static_hp_1 = ParameterString(name="InstanceType", default_value="ml.m5.xlarge")
json_get_hp = JsonGet(
step_name=step_process.name, property_file=property_file, json_path="train_size"
)
pytorch_estimator = PyTorch(
entry_point=entry_point,
role=role,
Expand All @@ -1392,10 +1430,11 @@ def test_tuning_multi_algos(
sagemaker_session=sagemaker_session,
enable_sagemaker_metrics=True,
max_retry_attempts=3,
hyperparameters={"static-hp": static_hp_1, "train_size": json_get_hp},
)

min_batch_size = ParameterString(name="MinBatchSize", default_value="64")
max_batch_size = ParameterString(name="MaxBatchSize", default_value="128")
max_batch_size = json_get_hp

tuner = HyperparameterTuner.create(
estimator_dict={
Expand All @@ -1415,6 +1454,7 @@ def test_tuning_multi_algos(
"estimator-2": [{"Name": "test:acc", "Regex": "Overall test accuracy: (.*?);"}],
},
)

inputs = {
"estimator-1": TrainingInput(s3_data=input_path),
"estimator-2": TrainingInput(s3_data=input_path),
Expand All @@ -1429,7 +1469,7 @@ def test_tuning_multi_algos(
pipeline = Pipeline(
name=pipeline_name,
parameters=[instance_count, instance_type, min_batch_size, max_batch_size],
steps=[step_tune],
steps=[step_process, step_tune],
sagemaker_session=sagemaker_session,
)

Expand Down
32 changes: 29 additions & 3 deletions tests/unit/test_tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
create_transfer_learning_tuner,
HyperparameterTuner,
)
from sagemaker.workflow.functions import JsonGet, Join
from sagemaker.workflow.parameters import ParameterString, ParameterInteger

from .tuner_test_utils import * # noqa: F403

Expand Down Expand Up @@ -68,14 +70,24 @@ def tuner(estimator):


def test_prepare_for_training(tuner):
static_hyperparameters = {"validated": 1, "another_one": 0}
hp1 = JsonGet(step_name="stepname", property_file="pf", json_path="jp")
hp2 = Join(on="/", values=["1", "2", ParameterString(name="ps", default_value="3")])

static_hyperparameters = {
"validated": 1,
"another_one": 0,
"hp1": hp1,
"hp2": hp2,
}

tuner.estimator.set_hyperparameters(**static_hyperparameters)
tuner._prepare_for_tuning()

assert tuner._current_job_name.startswith(IMAGE_NAME)

assert len(tuner.static_hyperparameters) == 1
assert len(tuner.static_hyperparameters) == 3
assert tuner.static_hyperparameters["another_one"] == "0"
assert tuner.static_hyperparameters["hp1"] == hp1
assert tuner.static_hyperparameters["hp2"] == hp2


def test_prepare_for_tuning_with_amazon_estimator(tuner, sagemaker_session):
Expand Down Expand Up @@ -1156,6 +1168,20 @@ def test_integer_parameter_ranges():
assert ranges["ScalingType"] == "Auto"


def test_integer_parameter_ranges_with_pipeline_parameter():
min = ParameterInteger(name="p", default_value=2)
max = JsonGet(step_name="sn", property_file="pf", json_path="jp")
scale = ParameterString(name="scale", default_value="Auto")
int_param = IntegerParameter(min, max)
ranges = int_param.as_tuning_range("some")

assert len(ranges.keys()) == 4
assert ranges["Name"] == "some"
assert ranges["MinValue"] == min
assert ranges["MaxValue"] == max
assert ranges["ScalingType"] == scale


def test_integer_parameter_scaling_type():
int_param = IntegerParameter(2, 3, scaling_type="Linear")
int_range = int_param.as_tuning_range("range")
Expand Down