Skip to content

change: Implement test mechanism for Pipeline variables #3163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 43 additions & 27 deletions src/sagemaker/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,22 @@
"""Test docstring"""
from __future__ import absolute_import

from typing import Optional, Union, Dict, List

import sagemaker
import sagemaker.parameter
from sagemaker import vpc_utils
from sagemaker.deserializers import BytesDeserializer
from sagemaker.deprecations import removed_kwargs
from sagemaker.estimator import EstimatorBase
from sagemaker.inputs import TrainingInput, FileSystemInput
from sagemaker.serializers import IdentitySerializer
from sagemaker.transformer import Transformer
from sagemaker.predictor import Predictor
from sagemaker.session import Session
from sagemaker.workflow.entities import PipelineVariable

from sagemaker.workflow import is_pipeline_variable


class AlgorithmEstimator(EstimatorBase):
Expand All @@ -37,28 +44,28 @@ class AlgorithmEstimator(EstimatorBase):

def __init__(
self,
algorithm_arn,
role,
instance_count,
instance_type,
volume_size=30,
volume_kms_key=None,
max_run=24 * 60 * 60,
input_mode="File",
output_path=None,
output_kms_key=None,
base_job_name=None,
sagemaker_session=None,
hyperparameters=None,
tags=None,
subnets=None,
security_group_ids=None,
model_uri=None,
model_channel_name="model",
metric_definitions=None,
encrypt_inter_container_traffic=False,
use_spot_instances=False,
max_wait=None,
algorithm_arn: str,
role: str,
instance_count: Optional[Union[int, PipelineVariable]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
volume_size: Union[int, PipelineVariable] = 30,
volume_kms_key: Optional[Union[str, PipelineVariable]] = None,
max_run: Union[int, PipelineVariable] = 24 * 60 * 60,
input_mode: Union[str, PipelineVariable] = "File",
output_path: Optional[Union[str, PipelineVariable]] = None,
output_kms_key: Optional[Union[str, PipelineVariable]] = None,
base_job_name: Optional[str] = None,
sagemaker_session: Optional[Session] = None,
hyperparameters: Optional[Dict[str, Union[str, PipelineVariable]]] = None,
tags: Optional[List[Dict[str, Union[str, PipelineVariable]]]] = None,
subnets: Optional[List[Union[str, PipelineVariable]]] = None,
security_group_ids: Optional[List[Union[str, PipelineVariable]]] = None,
model_uri: Optional[str] = None,
model_channel_name: Union[str, PipelineVariable] = "model",
metric_definitions: Optional[List[Dict[str, Union[str, PipelineVariable]]]] = None,
encrypt_inter_container_traffic: Union[bool, PipelineVariable] = False,
use_spot_instances: Union[bool, PipelineVariable] = False,
max_wait: Union[int, PipelineVariable] = None,
**kwargs # pylint: disable=W0613
):
"""Initialize an ``AlgorithmEstimator`` instance.
Expand Down Expand Up @@ -186,22 +193,25 @@ def validate_train_spec(self):
# Check that the input mode provided is compatible with the training input modes for the
# algorithm.
input_modes = self._algorithm_training_input_modes(train_spec["TrainingChannels"])
if self.input_mode not in input_modes:
if not is_pipeline_variable(self.input_mode) and self.input_mode not in input_modes:
raise ValueError(
"Invalid input mode: %s. %s only supports: %s"
% (self.input_mode, algorithm_name, input_modes)
)

# Check that the training instance type is compatible with the algorithm.
supported_instances = train_spec["SupportedTrainingInstanceTypes"]
if self.instance_type not in supported_instances:
if (
not is_pipeline_variable(self.instance_type)
and self.instance_type not in supported_instances
):
raise ValueError(
"Invalid instance_type: %s. %s supports the following instance types: %s"
% (self.instance_type, algorithm_name, supported_instances)
)

# Verify if distributed training is supported by the algorithm
if (
if not is_pipeline_variable(self.instance_count) and (
self.instance_count > 1
and "SupportsDistributedTraining" in train_spec
and not train_spec["SupportsDistributedTraining"]
Expand Down Expand Up @@ -414,12 +424,18 @@ def _prepare_for_training(self, job_name=None):

super(AlgorithmEstimator, self)._prepare_for_training(job_name)

def fit(self, inputs=None, wait=True, logs=True, job_name=None):
def fit(
self,
inputs: Optional[Union[str, Dict, TrainingInput, FileSystemInput]] = None,
wait: bool = True,
logs: bool = True,
job_name: Optional[str] = None,
):
"""Placeholder docstring"""
if inputs:
self._validate_input_channels(inputs)

super(AlgorithmEstimator, self).fit(inputs, wait, logs, job_name)
return super(AlgorithmEstimator, self).fit(inputs, wait, logs, job_name)

def _validate_input_channels(self, channels):
"""Placeholder docstring"""
Expand Down
43 changes: 29 additions & 14 deletions src/sagemaker/amazon/amazon_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"""Placeholder docstring"""
from __future__ import absolute_import

from typing import Optional, Union, Dict

import json
import logging
import tempfile
Expand All @@ -27,7 +29,10 @@
from sagemaker.estimator import EstimatorBase, _TrainingJob
from sagemaker.inputs import FileSystemInput, TrainingInput
from sagemaker.utils import sagemaker_timestamp
from sagemaker.workflow.entities import PipelineVariable
from sagemaker.workflow.pipeline_context import runnable_by_pipeline
from sagemaker.workflow.parameters import ParameterBoolean
from sagemaker.workflow import is_pipeline_variable

logger = logging.getLogger(__name__)

Expand All @@ -40,16 +45,16 @@ class AmazonAlgorithmEstimatorBase(EstimatorBase):

feature_dim = hp("feature_dim", validation.gt(0), data_type=int)
mini_batch_size = hp("mini_batch_size", validation.gt(0), data_type=int)
repo_name = None
repo_version = None
repo_name: Optional[str] = None
repo_version: Optional[str] = None

def __init__(
self,
role,
instance_count=None,
instance_type=None,
data_location=None,
enable_network_isolation=False,
role: str,
instance_count: Optional[Union[int]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
data_location: Optional[str] = None,
enable_network_isolation: Union[bool, ParameterBoolean] = False,
**kwargs
):
"""Initialize an AmazonAlgorithmEstimatorBase.
Expand Down Expand Up @@ -113,6 +118,11 @@ def data_location(self):
@data_location.setter
def data_location(self, data_location):
"""Placeholder docstring"""
if is_pipeline_variable(data_location):
raise ValueError(
"data_location argument has to be an integer " + "rather than a pipeline variable"
)

if not data_location.startswith("s3://"):
raise ValueError(
'Expecting an S3 URL beginning with "s3://". Got "{}"'.format(data_location)
Expand Down Expand Up @@ -196,12 +206,12 @@ def _prepare_for_training(self, records, mini_batch_size=None, job_name=None):
@runnable_by_pipeline
def fit(
self,
records,
mini_batch_size=None,
wait=True,
logs=True,
job_name=None,
experiment_config=None,
records: "RecordSet",
mini_batch_size: Optional[int] = None,
wait: bool = True,
logs: bool = True,
job_name: Optional[str] = None,
experiment_config: Optional[Dict[str, str]] = None,
):
"""Fit this Estimator on serialized Record objects, stored in S3.

Expand Down Expand Up @@ -304,7 +314,12 @@ class RecordSet(object):
"""Placeholder docstring"""

def __init__(
self, s3_data, num_records, feature_dim, s3_data_type="ManifestFile", channel="train"
self,
s3_data: Union[str, PipelineVariable],
num_records: int,
feature_dim: int,
s3_data_type: Union[str, PipelineVariable] = "ManifestFile",
channel: Union[str, PipelineVariable] = "train",
):
"""A collection of Amazon :class:~`Record` objects serialized and stored in S3.

Expand Down
106 changes: 53 additions & 53 deletions src/sagemaker/amazon/factorization_machines.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,83 +37,83 @@ class FactorizationMachines(AmazonAlgorithmEstimatorBase):
sparse datasets economically.
"""

repo_name = "factorization-machines"
repo_version = 1
repo_name: str = "factorization-machines"
repo_version: int = 1

num_factors = hp("num_factors", gt(0), "An integer greater than zero", int)
predictor_type = hp(
num_factors: hp = hp("num_factors", gt(0), "An integer greater than zero", int)
predictor_type: hp = hp(
"predictor_type",
isin("binary_classifier", "regressor"),
'Value "binary_classifier" or "regressor"',
str,
)
epochs = hp("epochs", gt(0), "An integer greater than 0", int)
clip_gradient = hp("clip_gradient", (), "A float value", float)
eps = hp("eps", (), "A float value", float)
rescale_grad = hp("rescale_grad", (), "A float value", float)
bias_lr = hp("bias_lr", ge(0), "A non-negative float", float)
linear_lr = hp("linear_lr", ge(0), "A non-negative float", float)
factors_lr = hp("factors_lr", ge(0), "A non-negative float", float)
bias_wd = hp("bias_wd", ge(0), "A non-negative float", float)
linear_wd = hp("linear_wd", ge(0), "A non-negative float", float)
factors_wd = hp("factors_wd", ge(0), "A non-negative float", float)
bias_init_method = hp(
epochs: hp = hp("epochs", gt(0), "An integer greater than 0", int)
clip_gradient: hp = hp("clip_gradient", (), "A float value", float)
eps: hp = hp("eps", (), "A float value", float)
rescale_grad: hp = hp("rescale_grad", (), "A float value", float)
bias_lr: hp = hp("bias_lr", ge(0), "A non-negative float", float)
linear_lr: hp = hp("linear_lr", ge(0), "A non-negative float", float)
factors_lr: hp = hp("factors_lr", ge(0), "A non-negative float", float)
bias_wd: hp = hp("bias_wd", ge(0), "A non-negative float", float)
linear_wd: hp = hp("linear_wd", ge(0), "A non-negative float", float)
factors_wd: hp = hp("factors_wd", ge(0), "A non-negative float", float)
bias_init_method: hp = hp(
"bias_init_method",
isin("normal", "uniform", "constant"),
'Value "normal", "uniform" or "constant"',
str,
)
bias_init_scale = hp("bias_init_scale", ge(0), "A non-negative float", float)
bias_init_sigma = hp("bias_init_sigma", ge(0), "A non-negative float", float)
bias_init_value = hp("bias_init_value", (), "A float value", float)
linear_init_method = hp(
bias_init_scale: hp = hp("bias_init_scale", ge(0), "A non-negative float", float)
bias_init_sigma: hp = hp("bias_init_sigma", ge(0), "A non-negative float", float)
bias_init_value: hp = hp("bias_init_value", (), "A float value", float)
linear_init_method: hp = hp(
"linear_init_method",
isin("normal", "uniform", "constant"),
'Value "normal", "uniform" or "constant"',
str,
)
linear_init_scale = hp("linear_init_scale", ge(0), "A non-negative float", float)
linear_init_sigma = hp("linear_init_sigma", ge(0), "A non-negative float", float)
linear_init_value = hp("linear_init_value", (), "A float value", float)
factors_init_method = hp(
linear_init_scale: hp = hp("linear_init_scale", ge(0), "A non-negative float", float)
linear_init_sigma: hp = hp("linear_init_sigma", ge(0), "A non-negative float", float)
linear_init_value: hp = hp("linear_init_value", (), "A float value", float)
factors_init_method: hp = hp(
"factors_init_method",
isin("normal", "uniform", "constant"),
'Value "normal", "uniform" or "constant"',
str,
)
factors_init_scale = hp("factors_init_scale", ge(0), "A non-negative float", float)
factors_init_sigma = hp("factors_init_sigma", ge(0), "A non-negative float", float)
factors_init_value = hp("factors_init_value", (), "A float value", float)
factors_init_scale: hp = hp("factors_init_scale", ge(0), "A non-negative float", float)
factors_init_sigma: hp = hp("factors_init_sigma", ge(0), "A non-negative float", float)
factors_init_value: hp = hp("factors_init_value", (), "A float value", float)

def __init__(
self,
role,
instance_count=None,
instance_type=None,
num_factors=None,
predictor_type=None,
epochs=None,
clip_gradient=None,
eps=None,
rescale_grad=None,
bias_lr=None,
linear_lr=None,
factors_lr=None,
bias_wd=None,
linear_wd=None,
factors_wd=None,
bias_init_method=None,
bias_init_scale=None,
bias_init_sigma=None,
bias_init_value=None,
linear_init_method=None,
linear_init_scale=None,
linear_init_sigma=None,
linear_init_value=None,
factors_init_method=None,
factors_init_scale=None,
factors_init_sigma=None,
factors_init_value=None,
role: str,
instance_count: Optional[Union[int, PipelineVariable]] = None,
instance_type: Optional[Union[str, PipelineVariable]] = None,
num_factors: Optional[int] = None,
predictor_type: Optional[str] = None,
epochs: Optional[int] = None,
clip_gradient: Optional[float] = None,
eps: Optional[float] = None,
rescale_grad: Optional[float] = None,
bias_lr: Optional[float] = None,
linear_lr: Optional[float] = None,
factors_lr: Optional[float] = None,
bias_wd: Optional[float] = None,
linear_wd: Optional[float] = None,
factors_wd: Optional[float] = None,
bias_init_method: Optional[str] = None,
bias_init_scale: Optional[float] = None,
bias_init_sigma: Optional[float] = None,
bias_init_value: Optional[float] = None,
linear_init_method: Optional[str] = None,
linear_init_scale: Optional[float] = None,
linear_init_sigma: Optional[float] = None,
linear_init_value: Optional[float] = None,
factors_init_method: Optional[str] = None,
factors_init_scale: Optional[float] = None,
factors_init_sigma: Optional[float] = None,
factors_init_value: Optional[float] = None,
**kwargs
):
"""Factorization Machines is :class:`Estimator` for general-purpose supervised learning.
Expand Down
1 change: 0 additions & 1 deletion src/sagemaker/amazon/hyperparameter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from __future__ import absolute_import

import json

from sagemaker.workflow import is_pipeline_variable


Expand Down
Loading