Skip to content

feature: add dataset definition support for processing jobs #2031

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 16, 2020
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/sagemaker/apiutils/_base_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ def from_boto(cls, boto_dict, **kwargs):
boto_dict (dict): A dictionary of a boto response.
**kwargs: Arbitrary keyword arguments
"""
if boto_dict is None:
return None

boto_dict = {k: v for k, v in boto_dict.items() if k not in cls._boto_ignore()}
custom_boto_names_to_member_names = {a: b for b, a in cls._custom_boto_names.items()}
cls_kwargs = _boto_functions.from_boto(
Expand Down
4 changes: 1 addition & 3 deletions src/sagemaker/apiutils/_boto_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ def from_boto(boto_dict, boto_name_to_member_name, member_name_to_type):
api_type, is_collection = member_name_to_type[member_name]
if is_collection:
if isinstance(boto_value, dict):
member_value = {
key: api_type.from_boto(value) for key, value in boto_value.items()
}
member_value = api_type.from_boto(boto_value)
else:
member_value = [api_type.from_boto(item) for item in boto_value]
else:
Expand Down
21 changes: 21 additions & 0 deletions src/sagemaker/dataset_definition/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""Classes for using DatasetDefinition in Processing job with Amazon SageMaker."""
from __future__ import absolute_import

from sagemaker.dataset_definition.inputs import ( # noqa: F401
DatasetDefinition,
S3Input,
RedshiftDatasetDefinition,
AthenaDatasetDefinition,
)
135 changes: 135 additions & 0 deletions src/sagemaker/dataset_definition/inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
# Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""The input configs for DatasetDefinition.

DatasetDefinition supports the data sources like S3 which can be queried via Athena
and Redshift. A mechanism has to be created for customers to generate datasets
from Athena/Redshift queries and to retrieve the data, using Processing jobs
so as to make it available for other downstream processes.
"""
from __future__ import absolute_import

from sagemaker.apiutils._base_types import ApiObject


class RedshiftDatasetDefinition(ApiObject):
"""DatasetDefinition for Redshift.

With this input, SQL queries will be executed using Redshift to generate datasets to S3.

Attributes:
cluster_id (str): The Redshift cluster Identifier.
database (str): The Redshift database created for your cluster.
db_user (str): The user name of a user account that has permission to connect
to the database.
query_string (str): The SQL query statements to be executed.
cluster_role_arn (str): Redshift cluster role arn.
output_s3_uri (str): The path to a specific S3 object or a S3 prefix for output
kms_key_id (str): KMS key id.
output_format (str): the data storage format for Redshift query results.
Valid options are "PARQUET", "CSV"
output_compression (str): compression used for Redshift query results.
Valid options are "None", "GZIP", "SNAPPY", "ZSTD", "BZIP2"
"""

cluster_id = None
database = None
db_user = None
query_string = None
cluster_role_arn = None
output_s3_uri = None
kms_key_id = None
output_format = None
output_compression = None


class AthenaDatasetDefinition(ApiObject):
"""DatasetDefinition for Athena.

With this input, SQL queries will be executed using Athena to generate datasets to S3.

Attributes:
catalog (str): The name of the data catalog used in query execution.
database (str): The name of the database used in the query execution.
query_string (str): The SQL query statements to be executed.
output_s3_uri (str): the path to a specific S3 object or a S3 prefix for output
work_group (str): The name of the workgroup in which the query is being started.
kms_key_id (str): KMS key id.
output_format (str): the data storage format for Athena query results.
Valid options are "PARQUET", "ORC", "AVRO", "JSON", "TEXTFILE"
output_compression (str): compression used for Athena query results.
Valid options are "GZIP", "SNAPPY", "ZLIB"
"""

catalog = None
database = None
query_string = None
output_s3_uri = None
work_group = None
kms_key_id = None
output_format = None
output_compression = None


class DatasetDefinition(ApiObject):
"""DatasetDefinition input.

Attributes:
data_distribution_type (str): Valid options are "FullyReplicated" or "ShardedByS3Key".
input_mode (str): Valid options are "Pipe" or "File".
local_path (str): the path to a local directory. If not provided, skips data download by
SageMaker platform.
redshift_dataset_definition
(:class:`~sagemaker.dataset_definition.RedshiftDatasetDefinition`): Redshift
dataset definition.
athena_dataset_definition (:class:`~sagemaker.dataset_definition.AthenaDatasetDefinition`):
Athena dataset definition.
"""

_custom_boto_types = {
"redshift_dataset_definition": (RedshiftDatasetDefinition, True),
"athena_dataset_definition": (AthenaDatasetDefinition, True),
}

data_distribution_type = "ShardedByS3Key"
input_mode = "File"
local_path = None
redshift_dataset_definition = None
athena_dataset_definition = None


class S3Input(ApiObject):
"""Metadata of data objects stored in S3.

Two options are provided: specifying a S3 prefix or by explicitly listing the files
in a manifest file and referencing the manifest file's S3 path.
Note: Strong consistency is not guaranteed if S3Prefix is provided here.
S3 list operations are not strongly consistent.
Use ManifestFile if strong consistency is required.

Attributes:
s3_uri (str): the path to a specific S3 object or a S3 prefix
local_path (str): the path to a local directory. If not provided, skips data download
by SageMaker platform.
s3_data_type (str): Valid options are "ManifestFile" or "S3Prefix".
s3_input_mode (str): Valid options are "Pipe" or "File".
s3_data_distribution_type (str): Valid options are "FullyReplicated" or "ShardedByS3Key".
s3_compression_type (str): Valid options are "None" or "Gzip".
"""

s3_uri = None
local_path = None
s3_data_type = "S3Prefix"
s3_input_mode = "File"
s3_data_distribution_type = "FullyReplicated"
s3_compression_type = None
Loading