-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feature: support configurable retry for pipeline steps #2662
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
868db55
add helper function to generate no-op (data ingestion only) recipe
jerrypeng7773 21bedbb
Merge branch 'aws:master' into master
jerrypeng7773 854dd10
separate flow generation by source input type + move generation helpe…
jerrypeng7773 8798b65
Merge branch 'aws:master' into master
jerrypeng7773 69ae4bd
create an internal helper function to generate output node
jerrypeng7773 a6a8449
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 2aa256e
Merge branch 'aws:master' into master
jerrypeng7773 06557a8
add ingestion test using dw processor via pipeline execution
jerrypeng7773 dcbfd13
Merge branch 'aws:master' into master
jerrypeng7773 fc6522e
verify the fg query df
jerrypeng7773 b6f9371
Merge branch 'master' into master
ahsan-z-khan 86fa47d
fix tests
jerrypeng7773 05ccfa6
Merge branch 'master' into master
ahsan-z-khan 0716e9f
Merge branch 'aws:master' into master
jerrypeng7773 7ca5af4
add tuning step support
jerrypeng7773 8cf18b8
fix docstyle check
jerrypeng7773 1f95b82
add helper function to get tuning step top performing model s3 uri
jerrypeng7773 1b9d66b
Merge branch 'aws:master' into master
jerrypeng7773 5bc47bd
allow step depends on pass in step instance
jerrypeng7773 603b934
Merge branch 'aws:master' into master
jerrypeng7773 664f2a8
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 a8755ec
Merge branch 'master' into master
apogupta2018 e25d36c
Merge branch 'aws:master' into master
jerrypeng7773 a9cfab4
Merge branch 'master' into accept-step-object-in-dependson-list
jerrypeng7773 c0066ea
resolve merge conflict
jerrypeng7773 e9ac9fa
support passing step object to tuning step depends on list
jerrypeng7773 eb6a523
fix test_workflow_with_clarify
jerrypeng7773 c19c426
add tuning step to docs
jerrypeng7773 450e4a5
allow step instance in depends on list for repack and reigster model …
jerrypeng7773 cb7be4a
Merge branch 'master' into master
ahsan-z-khan 2918765
add tuning step get_top_model_s3_uri to doc
jerrypeng7773 fe9bd70
Merge branch 'aws:master' into master
jerrypeng7773 378c868
Merge branch 'master' of github.com:jerrypeng7773/sagemaker-python-sdk
jerrypeng7773 93cdb68
remove extra new line
jerrypeng7773 24226f9
add callback step to doc
jerrypeng7773 001cac5
switch order in doc
jerrypeng7773 b5c00c1
Merge branch 'master' into master
ahsan-z-khan 3b75821
Merge branch 'master' into accept-step-object-in-dependson-list
ahsan-z-khan e70ae34
Merge branch 'aws:master' into master
jerrypeng7773 0eaf41b
Merge branch 'master' of https://github.com/aws/sagemaker-python-sdk …
jerrypeng7773 dad08c4
fix formatting
jerrypeng7773 edf9cba
support parameterize tuning job parameter ranges
jerrypeng7773 57bd90d
Merge branch 'aws:master' into master
jerrypeng7773 597bb74
Merge branch 'aws:master' into master
jerrypeng7773 ae55619
support tuning step parameter range parameterization + support retry …
jerrypeng7773 5a6148a
Merge branch 'master' into master
ahsan-z-khan 9b1d905
Merge branch 'master' into master
ahsan-z-khan 282c9fe
Merge branch 'master' into master
ahsan-z-khan 7279588
Merge branch 'aws:master' into master
jerrypeng7773 b92389b
Merge branch 'aws:master' into master
jerrypeng7773 3e7b04c
add configurable retry support
jerrypeng7773 dd1fef5
remove adding new default throttling retry policy
jerrypeng7773 7715357
reformatting
jerrypeng7773 12905cd
doc: update experiment config doc on fit method (#2609)
danabens 962a06e
prepare release v2.59.1.post0
9c6c0c7
update development version to v2.59.2.dev0
c47e598
fix: unit tests for KIX and remove regional calls to boto (#2640)
shreyapandit d2b43d2
documentation: Remove Shortbread (#2610)
jkroll-aws 7bb72e3
prepare release v2.59.2
0d435af
update development version to v2.59.3.dev0
0e496ad
Documentation: instance_type no longer hidden in instance_count docum…
daMichaelB ec44d50
prepare release v2.59.3
08d7c5b
update development version to v2.59.4.dev0
95e161c
documentation: Info about offline s3 bucket key when creating feature…
can-sun 8f801f0
prepare release v2.59.3.post0
ba5d88d
update development version to v2.59.4.dev0
5e9fdba
fix: add pytorch 1.8.1 for huggingface (#2642)
jeniyat 4fa9d18
Doc: updated pr template to check backward compatibility (#2655)
shreyapandit 5a9e654
support pipeline step configurable retry
fb5140e
Merge branch 'aws:master' into master
jerrypeng7773 c435184
Merge branch 'master' into master
ahsan-z-khan 1e24b0f
make default retry parameters static vars
jerrypeng7773 df628d7
remove unused import
jerrypeng7773 6647ea6
Merge branch 'master' into master
jerrypeng7773 02acf0c
Merge branch 'master' into master
ahsan-z-khan 0289290
Merge branch 'master' into master
jeniyat c07b2e3
Merge branch 'master' into master
ahsan-z-khan 336e293
Merge branch 'master' into master
ahsan-z-khan a713c6d
Merge branch 'master' into master
ahsan-z-khan 7fee07b
Merge branch 'master' into master
ahsan-z-khan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,204 @@ | ||
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"). You | ||
# may not use this file except in compliance with the License. A copy of | ||
# the License is located at | ||
# | ||
# http://aws.amazon.com/apache2.0/ | ||
# | ||
# or in the "license" file accompanying this file. This file is | ||
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | ||
# ANY KIND, either express or implied. See the License for the specific | ||
# language governing permissions and limitations under the License. | ||
"""Pipeline parameters and conditions for workflow.""" | ||
from __future__ import absolute_import | ||
|
||
from enum import Enum | ||
from typing import List | ||
import attr | ||
|
||
from sagemaker.workflow.entities import Entity, DefaultEnumMeta, RequestType | ||
|
||
|
||
DEFAULT_BACKOFF_RATE = 2.0 | ||
DEFAULT_INTERVAL_SECONDS = 1 | ||
MAX_ATTEMPTS_CAP = 20 | ||
MAX_EXPIRE_AFTER_MIN = 14400 | ||
|
||
|
||
class StepExceptionTypeEnum(Enum, metaclass=DefaultEnumMeta): | ||
"""Step ExceptionType enum.""" | ||
|
||
SERVICE_FAULT = "Step.SERVICE_FAULT" | ||
THROTTLING = "Step.THROTTLING" | ||
|
||
|
||
class SageMakerJobExceptionTypeEnum(Enum, metaclass=DefaultEnumMeta): | ||
"""SageMaker Job ExceptionType enum.""" | ||
|
||
INTERNAL_ERROR = "SageMaker.JOB_INTERNAL_ERROR" | ||
CAPACITY_ERROR = "SageMaker.CAPACITY_ERROR" | ||
RESOURCE_LIMIT = "SageMaker.RESOURCE_LIMIT" | ||
|
||
|
||
@attr.s | ||
class RetryPolicy(Entity): | ||
"""RetryPolicy base class | ||
|
||
Attributes: | ||
backoff_rate (float): The multiplier by which the retry interval increases | ||
during each attempt (default: 2.0) | ||
interval_seconds (int): An integer that represents the number of seconds before the | ||
first retry attempt (default: 1) | ||
max_attempts (int): A positive integer that represents the maximum | ||
number of retry attempts. (default: None) | ||
expire_after_mins (int): A positive integer that represents the maximum minute | ||
to expire any further retry attempt (default: None) | ||
""" | ||
|
||
backoff_rate: float = attr.ib(default=DEFAULT_BACKOFF_RATE) | ||
interval_seconds: int = attr.ib(default=DEFAULT_INTERVAL_SECONDS) | ||
max_attempts: int = attr.ib(default=None) | ||
expire_after_mins: int = attr.ib(default=None) | ||
|
||
@backoff_rate.validator | ||
def validate_backoff_rate(self, _, value): | ||
"""Validate the input back off rate type""" | ||
if value: | ||
assert value >= 0.0, "backoff_rate should be non-negative" | ||
|
||
@interval_seconds.validator | ||
def validate_interval_seconds(self, _, value): | ||
"""Validate the input interval seconds""" | ||
if value: | ||
assert value >= 0.0, "interval_seconds rate should be non-negative" | ||
|
||
@max_attempts.validator | ||
def validate_max_attempts(self, _, value): | ||
"""Validate the input max attempts""" | ||
if value: | ||
assert ( | ||
MAX_ATTEMPTS_CAP >= value >= 1 | ||
), f"max_attempts must in range of (0, {MAX_ATTEMPTS_CAP}] attempts" | ||
|
||
@expire_after_mins.validator | ||
def validate_expire_after_mins(self, _, value): | ||
"""Validate expire after mins""" | ||
if value: | ||
assert ( | ||
MAX_EXPIRE_AFTER_MIN >= value >= 0 | ||
), f"expire_after_mins must in range of (0, {MAX_EXPIRE_AFTER_MIN}] minutes" | ||
|
||
def to_request(self) -> RequestType: | ||
"""Get the request structure for workflow service calls.""" | ||
if (self.max_attempts is None) == self.expire_after_mins is None: | ||
raise ValueError("Only one of [max_attempts] and [expire_after_mins] can be given.") | ||
|
||
request = { | ||
"BackoffRate": self.backoff_rate, | ||
"IntervalSeconds": self.interval_seconds, | ||
} | ||
|
||
if self.max_attempts: | ||
request["MaxAttempts"] = self.max_attempts | ||
|
||
if self.expire_after_mins: | ||
request["ExpireAfterMin"] = self.expire_after_mins | ||
|
||
return request | ||
|
||
|
||
class StepRetryPolicy(RetryPolicy): | ||
"""RetryPolicy for a retryable step. The pipeline service will retry | ||
|
||
`sagemaker.workflow.retry.StepRetryExceptionTypeEnum.SERVICE_FAULT` and | ||
`sagemaker.workflow.retry.StepRetryExceptionTypeEnum.THROTTLING` regardless of | ||
pipeline step type by default. However, for step defined as retryable, you can override them | ||
by specifying a StepRetryPolicy. | ||
|
||
Attributes: | ||
exception_types (List[StepExceptionTypeEnum]): the exception types to match for this policy | ||
backoff_rate (float): The multiplier by which the retry interval increases | ||
during each attempt (default: 2.0) | ||
interval_seconds (int): An integer that represents the number of seconds before the | ||
first retry attempt (default: 1) | ||
max_attempts (int): A positive integer that represents the maximum | ||
number of retry attempts. (default: None) | ||
expire_after_mins (int): A positive integer that represents the maximum minute | ||
to expire any further retry attempt (default: None) | ||
""" | ||
|
||
def __init__( | ||
self, | ||
exception_types: List[StepExceptionTypeEnum], | ||
backoff_rate: float = 2.0, | ||
interval_seconds: int = 1, | ||
max_attempts: int = None, | ||
expire_after_mins: int = None, | ||
): | ||
super().__init__(backoff_rate, interval_seconds, max_attempts, expire_after_mins) | ||
for exception_type in exception_types: | ||
if not isinstance(exception_type, StepExceptionTypeEnum): | ||
raise ValueError(f"{exception_type} is not of StepExceptionTypeEnum.") | ||
self.exception_types = exception_types | ||
|
||
def to_request(self) -> RequestType: | ||
"""Gets the request structure for retry policy.""" | ||
request = super().to_request() | ||
request["ExceptionType"] = [e.value for e in self.exception_types] | ||
return request | ||
|
||
|
||
class SageMakerJobStepRetryPolicy(RetryPolicy): | ||
"""RetryPolicy for exception thrown by SageMaker Job. | ||
|
||
Attributes: | ||
exception_types (List[SageMakerJobExceptionTypeEnum]): | ||
The SageMaker exception to match for this policy. The SageMaker exceptions | ||
captured here are the exceptions thrown by synchronously | ||
creating the job. For instance the resource limit exception. | ||
failure_reason_types (List[SageMakerJobExceptionTypeEnum]): the SageMaker | ||
failure reason types to match for this policy. The failure reason type | ||
is presented in FailureReason field of the Describe response, it indicates | ||
the runtime failure reason for a job. | ||
backoff_rate (float): The multiplier by which the retry interval increases | ||
during each attempt (default: 2.0) | ||
interval_seconds (int): An integer that represents the number of seconds before the | ||
first retry attempt (default: 1) | ||
max_attempts (int): A positive integer that represents the maximum | ||
number of retry attempts. (default: None) | ||
expire_after_mins (int): A positive integer that represents the maximum minute | ||
to expire any further retry attempt (default: None) | ||
""" | ||
|
||
def __init__( | ||
self, | ||
exception_types: List[SageMakerJobExceptionTypeEnum] = None, | ||
failure_reason_types: List[SageMakerJobExceptionTypeEnum] = None, | ||
backoff_rate: float = 2.0, | ||
interval_seconds: int = 1, | ||
max_attempts: int = None, | ||
expire_after_mins: int = None, | ||
): | ||
super().__init__(backoff_rate, interval_seconds, max_attempts, expire_after_mins) | ||
|
||
if not exception_types and not failure_reason_types: | ||
raise ValueError( | ||
"At least one of the [exception_types, failure_reason_types] needs to be given." | ||
) | ||
|
||
self.exception_type_list: List[SageMakerJobExceptionTypeEnum] = [] | ||
if exception_types: | ||
self.exception_type_list += exception_types | ||
if failure_reason_types: | ||
self.exception_type_list += failure_reason_types | ||
|
||
for exception_type in self.exception_type_list: | ||
if not isinstance(exception_type, SageMakerJobExceptionTypeEnum): | ||
raise ValueError(f"{exception_type} is not of SageMakerJobExceptionTypeEnum.") | ||
|
||
def to_request(self) -> RequestType: | ||
"""Gets the request structure for retry policy.""" | ||
request = super().to_request() | ||
request["ExceptionType"] = [e.value for e in self.exception_type_list] | ||
return request |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.