Skip to content

breaking: Move _CsvSerializer to sagemaker.serializers.CSVSerializer #1700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions doc/frameworks/tensorflow/deploying_tensorflow_serving.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ your input data to CSV format:

# create a Predictor with JSON serialization

predictor = Predictor('endpoint-name', serializer=sagemaker.predictor.csv_serializer)
predictor = Predictor('endpoint-name', serializer=sagemaker.serializers.CSVSerializer())

# CSV-formatted string input
input = '1.0,2.0,5.0\n1.0,2.0,5.0\n1.0,2.0,5.0'
Expand All @@ -256,7 +256,7 @@ your input data to CSV format:
]
}

You can also use python arrays or numpy arrays as input and let the `csv_serializer` object
You can also use python arrays or numpy arrays as input and let the ``CSVSerializer`` object
convert them to CSV, but the client-size CSV conversion is more sophisticated than the
CSV parsing on the Endpoint, so if you encounter conversion problems, try using one of the
JSON options instead.
Expand Down
4 changes: 2 additions & 2 deletions doc/frameworks/tensorflow/using_tf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ your input data to CSV format:

# create a Predictor with JSON serialization

predictor = Predictor('endpoint-name', serializer=sagemaker.predictor.csv_serializer)
predictor = Predictor('endpoint-name', serializer=sagemaker.serializers.CSVSerializer())

# CSV-formatted string input
input = '1.0,2.0,5.0\n1.0,2.0,5.0\n1.0,2.0,5.0'
Expand All @@ -726,7 +726,7 @@ your input data to CSV format:
]
}

You can also use python arrays or numpy arrays as input and let the `csv_serializer` object
You can also use python arrays or numpy arrays as input and let the ``CSVSerializer`` object
convert them to CSV, but the client-size CSV conversion is more sophisticated than the
CSV parsing on the Endpoint, so if you encounter conversion problems, try using one of the
JSON options instead.
Expand Down
5 changes: 3 additions & 2 deletions src/sagemaker/amazon/ipinsights.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
from sagemaker.amazon.hyperparameter import Hyperparameter as hp # noqa
from sagemaker.amazon.validation import ge, le
from sagemaker.deserializers import JSONDeserializer
from sagemaker.predictor import Predictor, csv_serializer
from sagemaker.predictor import Predictor
from sagemaker.model import Model
from sagemaker.serializers import CSVSerializer
from sagemaker.session import Session
from sagemaker.vpc_utils import VPC_CONFIG_DEFAULT

Expand Down Expand Up @@ -198,7 +199,7 @@ def __init__(self, endpoint_name, sagemaker_session=None):
super(IPInsightsPredictor, self).__init__(
endpoint_name,
sagemaker_session,
serializer=csv_serializer,
serializer=CSVSerializer(),
deserializer=JSONDeserializer(),
)

Expand Down
107 changes: 0 additions & 107 deletions src/sagemaker/predictor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,6 @@
"""Placeholder docstring"""
from __future__ import print_function, absolute_import

import csv
from six import StringIO
import numpy as np

from sagemaker.content_types import CONTENT_TYPE_CSV
from sagemaker.deserializers import BaseDeserializer
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.serializers import BaseSerializer
Expand Down Expand Up @@ -490,105 +485,3 @@ def deserialize(self, data, content_type):
def ACCEPT(self):
"""The content type that is expected from the inference endpoint."""
return self.accept


class _CsvSerializer(object):
"""Placeholder docstring"""

def __init__(self):
"""Placeholder docstring"""
self.content_type = CONTENT_TYPE_CSV

def __call__(self, data):
"""Take data of various data formats and serialize them into CSV.

Args:
data (object): Data to be serialized.

Returns:
object: Sequence of bytes to be used for the request body.
"""
# For inputs which represent multiple "rows", the result should be newline-separated CSV
# rows
if _is_mutable_sequence_like(data) and len(data) > 0 and _is_sequence_like(data[0]):
return "\n".join([_CsvSerializer._serialize_row(row) for row in data])
return _CsvSerializer._serialize_row(data)

@staticmethod
def _serialize_row(data):
# Don't attempt to re-serialize a string
"""
Args:
data:
"""
if isinstance(data, str):
return data
if isinstance(data, np.ndarray):
data = np.ndarray.flatten(data)
if hasattr(data, "__len__"):
if len(data) == 0:
raise ValueError("Cannot serialize empty array")
return _csv_serialize_python_array(data)

# files and buffers
if hasattr(data, "read"):
return _csv_serialize_from_buffer(data)

raise ValueError("Unable to handle input format: ", type(data))


def _csv_serialize_python_array(data):
"""
Args:
data:
"""
return _csv_serialize_object(data)


def _csv_serialize_from_buffer(buff):
"""
Args:
buff:
"""
return buff.read()


def _csv_serialize_object(data):
"""
Args:
data:
"""
csv_buffer = StringIO()

csv_writer = csv.writer(csv_buffer, delimiter=",")
csv_writer.writerow(data)
return csv_buffer.getvalue().rstrip("\r\n")


csv_serializer = _CsvSerializer()


def _is_mutable_sequence_like(obj):
"""
Args:
obj:
"""
return _is_sequence_like(obj) and hasattr(obj, "__setitem__")


def _is_sequence_like(obj):
"""
Args:
obj:
"""
return hasattr(obj, "__iter__") and hasattr(obj, "__getitem__")


def _row_to_csv(obj):
"""
Args:
obj:
"""
if isinstance(obj, str):
return obj
return ",".join(obj)
57 changes: 57 additions & 0 deletions src/sagemaker/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from __future__ import absolute_import

import abc
import csv
import io
import json

Expand Down Expand Up @@ -44,6 +45,62 @@ def CONTENT_TYPE(self):
"""The MIME type of the data sent to the inference endpoint."""


class CSVSerializer(BaseSerializer):
"""Searilize data of various formats to a CSV-formatted string."""

CONTENT_TYPE = "text/csv"

def serialize(self, data):
"""Serialize data of various formats to a CSV-formatted string.

Args:
data (object): Data to be serialized. Can be a NumPy array, list,
file, or buffer.

Returns:
str: The data serialized as a CSV-formatted string.
"""
if hasattr(data, "read"):
return data.read()

is_mutable_sequence_like = self._is_sequence_like(data) and hasattr(data, "__setitem__")
has_multiple_rows = len(data) > 0 and self._is_sequence_like(data[0])

if is_mutable_sequence_like and has_multiple_rows:
return "\n".join([self._serialize_row(row) for row in data])

return self._serialize_row(data)

def _serialize_row(self, data):
"""Serialize data as a CSV-formatted row.

Args:
data (object): Data to be serialized in a row.

Returns:
str: The data serialized as a CSV-formatted row.
"""
if isinstance(data, str):
return data

if isinstance(data, np.ndarray):
data = np.ndarray.flatten(data)

if hasattr(data, "__len__"):
if len(data) == 0:
raise ValueError("Cannot serialize empty array")
csv_buffer = io.StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=",")
csv_writer.writerow(data)
return csv_buffer.getvalue().rstrip("\r\n")

raise ValueError("Unable to handle input format: ", type(data))

def _is_sequence_like(self, data):
"""Returns true if obj is iterable and subscriptable."""
return hasattr(data, "__iter__") and hasattr(data, "__getitem__")


class NumpySerializer(BaseSerializer):
"""Serialize data to a buffer using the .npy format."""

Expand Down
4 changes: 2 additions & 2 deletions src/sagemaker/sparkml/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from sagemaker import Model, Predictor, Session
from sagemaker.content_types import CONTENT_TYPE_CSV
from sagemaker.fw_registry import registry
from sagemaker.predictor import csv_serializer
from sagemaker.serializers import CSVSerializer

framework_name = "sparkml-serving"
repo_name = "sagemaker-sparkml-serving"
Expand Down Expand Up @@ -51,7 +51,7 @@ def __init__(self, endpoint_name, sagemaker_session=None):
super(SparkMLPredictor, self).__init__(
endpoint_name=endpoint_name,
sagemaker_session=sagemaker_session,
serializer=csv_serializer,
serializer=CSVSerializer(),
content_type=CONTENT_TYPE_CSV,
)

Expand Down
8 changes: 3 additions & 5 deletions tests/integ/test_marketplace.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sagemaker
import tests.integ
from sagemaker import AlgorithmEstimator, ModelPackage
from sagemaker.serializers import CSVSerializer
from sagemaker.tuner import IntegerParameter, HyperparameterTuner
from sagemaker.utils import sagemaker_timestamp
from sagemaker.utils import _aws_partition
Expand Down Expand Up @@ -136,10 +137,7 @@ def test_marketplace_attach(sagemaker_session, cpu_instance_type):
training_job_name=training_job_name, sagemaker_session=sagemaker_session
)
predictor = estimator.deploy(
1,
cpu_instance_type,
endpoint_name=endpoint_name,
serializer=sagemaker.predictor.csv_serializer,
1, cpu_instance_type, endpoint_name=endpoint_name, serializer=CSVSerializer()
)
shape = pandas.read_csv(os.path.join(data_path, "iris.csv"), header=None)
a = [50 * i for i in range(3)]
Expand All @@ -165,7 +163,7 @@ def test_marketplace_model(sagemaker_session, cpu_instance_type):
)

def predict_wrapper(endpoint, session):
return sagemaker.Predictor(endpoint, session, serializer=sagemaker.predictor.csv_serializer)
return sagemaker.Predictor(endpoint, session, serializer=CSVSerializer())

model = ModelPackage(
role="SageMakerRole",
Expand Down
7 changes: 4 additions & 3 deletions tests/integ/test_multi_variant_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from sagemaker.content_types import CONTENT_TYPE_CSV
from sagemaker.utils import unique_name_from_base
from sagemaker.amazon.amazon_estimator import get_image_uri
from sagemaker.predictor import csv_serializer, Predictor
from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer


import tests.integ
Expand Down Expand Up @@ -169,7 +170,7 @@ def test_predict_invocation_with_target_variant(sagemaker_session, multi_variant
predictor = Predictor(
endpoint_name=multi_variant_endpoint.endpoint_name,
sagemaker_session=sagemaker_session,
serializer=csv_serializer,
serializer=CSVSerializer(),
content_type=CONTENT_TYPE_CSV,
accept=CONTENT_TYPE_CSV,
)
Expand Down Expand Up @@ -297,7 +298,7 @@ def test_predict_invocation_with_target_variant_local_mode(
predictor = Predictor(
endpoint_name=multi_variant_endpoint.endpoint_name,
sagemaker_session=sagemaker_session,
serializer=csv_serializer,
serializer=CSVSerializer(),
content_type=CONTENT_TYPE_CSV,
accept=CONTENT_TYPE_CSV,
)
Expand Down
5 changes: 2 additions & 3 deletions tests/integ/test_tfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tests.integ
import tests.integ.timeout
from sagemaker.tensorflow.model import TensorFlowModel, TensorFlowPredictor
from sagemaker.serializers import CSVSerializer


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -236,9 +237,7 @@ def test_predict_csv(tfs_predictor):
expected_result = {"predictions": [[3.5, 4.0, 5.5], [3.5, 4.0, 5.5]]}

predictor = TensorFlowPredictor(
tfs_predictor.endpoint_name,
tfs_predictor.sagemaker_session,
serializer=sagemaker.predictor.csv_serializer,
tfs_predictor.endpoint_name, tfs_predictor.sagemaker_session, serializer=CSVSerializer(),
)

result = predictor.predict(input_data)
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/sagemaker/tensorflow/test_tfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import mock
import pytest
from mock import Mock, patch
from sagemaker.predictor import csv_serializer
from sagemaker.serializers import CSVSerializer
from sagemaker.tensorflow import TensorFlow
from sagemaker.tensorflow.model import TensorFlowModel, TensorFlowPredictor

Expand Down Expand Up @@ -323,7 +323,7 @@ def test_predictor_jsons(sagemaker_session):


def test_predictor_csv(sagemaker_session):
predictor = TensorFlowPredictor("endpoint", sagemaker_session, serializer=csv_serializer)
predictor = TensorFlowPredictor("endpoint", sagemaker_session, serializer=CSVSerializer())

mock_response(json.dumps(PREDICT_RESPONSE).encode("utf-8"), sagemaker_session)
result = predictor.predict([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
Expand Down Expand Up @@ -398,14 +398,14 @@ def test_predictor_regress(sagemaker_session):


def test_predictor_regress_bad_content_type(sagemaker_session):
predictor = TensorFlowPredictor("endpoint", sagemaker_session, csv_serializer)
predictor = TensorFlowPredictor("endpoint", sagemaker_session, CSVSerializer())

with pytest.raises(ValueError):
predictor.regress(REGRESS_INPUT)


def test_predictor_classify_bad_content_type(sagemaker_session):
predictor = TensorFlowPredictor("endpoint", sagemaker_session, csv_serializer)
predictor = TensorFlowPredictor("endpoint", sagemaker_session, CSVSerializer())

with pytest.raises(ValueError):
predictor.classify(CLASSIFY_INPUT)
Expand Down
Loading