Skip to content

Commit 7ef5771

Browse files
meownoidandremoeller
authored andcommitted
Implemented write_spmatrix_to_sparse_tensor (#28)
Add support for serializing scipy.sparse matrices
1 parent 157d867 commit 7ef5771

File tree

5 files changed

+260
-2
lines changed

5 files changed

+260
-2
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ examples/tensorflow/distributed_mnist/data
1919
*.iml
2020
doc/_build
2121
**/.DS_Store
22+
venv/

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def read(fname):
3131
],
3232

3333
# Declare minimal set for installation
34-
install_requires=['boto3>=1.4.8', 'numpy>=1.9.0', 'protobuf>=3.1'],
34+
install_requires=['boto3>=1.4.8', 'numpy>=1.9.0', 'protobuf>=3.1', 'scipy>=1.0.0'],
3535

3636
extras_require={
3737
'test': ['tox', 'flake8', 'pytest', 'pytest-cov', 'pytest-xdist',

src/sagemaker/amazon/common.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import sys
1616

1717
import numpy as np
18+
from scipy.sparse import issparse
1819

1920
from sagemaker.amazon.record_pb2 import Record
2021

@@ -64,6 +65,24 @@ def _write_label_tensor(resolved_type, record, scalar):
6465
record.label["values"].float32_tensor.values.extend([scalar])
6566

6667

68+
def _write_keys_tensor(resolved_type, record, vector):
69+
if resolved_type == "Int32":
70+
record.features["values"].int32_tensor.keys.extend(vector)
71+
elif resolved_type == "Float64":
72+
record.features["values"].float64_tensor.keys.extend(vector)
73+
elif resolved_type == "Float32":
74+
record.features["values"].float32_tensor.keys.extend(vector)
75+
76+
77+
def _write_shape(resolved_type, record, scalar):
78+
if resolved_type == "Int32":
79+
record.features["values"].int32_tensor.shape.extend([scalar])
80+
elif resolved_type == "Float64":
81+
record.features["values"].float64_tensor.shape.extend([scalar])
82+
elif resolved_type == "Float32":
83+
record.features["values"].float32_tensor.shape.extend([scalar])
84+
85+
6786
def write_numpy_to_dense_tensor(file, array, labels=None):
6887
"""Writes a numpy array to a dense tensor"""
6988

@@ -89,6 +108,46 @@ def write_numpy_to_dense_tensor(file, array, labels=None):
89108
_write_recordio(file, record.SerializeToString())
90109

91110

111+
def write_spmatrix_to_sparse_tensor(file, array, labels=None):
112+
"""Writes a scipy sparse matrix to a sparse tensor"""
113+
114+
if not issparse(array):
115+
raise TypeError("Array must be sparse")
116+
117+
# Validate shape of array and labels, resolve array and label types
118+
if not len(array.shape) == 2:
119+
raise ValueError("Array must be a Matrix")
120+
if labels is not None:
121+
if not len(labels.shape) == 1:
122+
raise ValueError("Labels must be a Vector")
123+
if labels.shape[0] not in array.shape:
124+
raise ValueError("Label shape {} not compatible with array shape {}".format(
125+
labels.shape, array.shape))
126+
resolved_label_type = _resolve_type(labels.dtype)
127+
resolved_type = _resolve_type(array.dtype)
128+
129+
csr_array = array.tocsr()
130+
n_rows, n_cols = csr_array.shape
131+
132+
record = Record()
133+
for row_idx in range(n_rows):
134+
record.Clear()
135+
row = csr_array.getrow(row_idx)
136+
# Write values
137+
_write_feature_tensor(resolved_type, record, row.data)
138+
# Write keys
139+
_write_keys_tensor(resolved_type, record, row.indices.astype(np.uint64))
140+
141+
# Write labels
142+
if labels is not None:
143+
_write_label_tensor(resolved_label_type, record, labels[row_idx])
144+
145+
# Write shape
146+
_write_shape(resolved_type, record, n_cols)
147+
148+
_write_recordio(file, record.SerializeToString())
149+
150+
92151
def read_records(file):
93152
"""Eagerly read a collection of amazon Record protobuf objects from file."""
94153
records = []

tests/unit/test_common.py

Lines changed: 198 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@
1313
import numpy as np
1414
import tempfile
1515
import pytest
16+
import itertools
17+
from scipy.sparse import coo_matrix
1618
from sagemaker.amazon.common import (record_deserializer, write_numpy_to_dense_tensor, _read_recordio,
17-
numpy_to_record_serializer)
19+
numpy_to_record_serializer, write_spmatrix_to_sparse_tensor)
1820
from sagemaker.amazon.record_pb2 import Record
1921

2022

@@ -131,3 +133,198 @@ def test_invalid_label():
131133
with tempfile.TemporaryFile() as f:
132134
with pytest.raises(ValueError):
133135
write_numpy_to_dense_tensor(f, array, label_data)
136+
137+
138+
def test_dense_float_write_spmatrix_to_sparse_tensor():
139+
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
140+
keys_data = [[0, 1, 2], [0, 1, 2]]
141+
array = coo_matrix(np.array(array_data))
142+
with tempfile.TemporaryFile() as f:
143+
write_spmatrix_to_sparse_tensor(f, array)
144+
f.seek(0)
145+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
146+
record = Record()
147+
record.ParseFromString(record_data)
148+
assert record.features["values"].float64_tensor.values == expected_data
149+
assert record.features["values"].float64_tensor.keys == expected_keys
150+
assert record.features["values"].float64_tensor.shape == [len(expected_data)]
151+
152+
153+
def test_dense_float32_write_spmatrix_to_sparse_tensor():
154+
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
155+
keys_data = [[0, 1, 2], [0, 1, 2]]
156+
array = coo_matrix(np.array(array_data).astype(np.dtype('float32')))
157+
with tempfile.TemporaryFile() as f:
158+
write_spmatrix_to_sparse_tensor(f, array)
159+
f.seek(0)
160+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
161+
record = Record()
162+
record.ParseFromString(record_data)
163+
assert record.features["values"].float32_tensor.values == expected_data
164+
assert record.features["values"].float32_tensor.keys == expected_keys
165+
assert record.features["values"].float32_tensor.shape == [len(expected_data)]
166+
167+
168+
def test_dense_int_write_spmatrix_to_sparse_tensor():
169+
array_data = [[1.0, 2.0, 3.0], [10.0, 20.0, 30.0]]
170+
keys_data = [[0, 1, 2], [0, 1, 2]]
171+
array = coo_matrix(np.array(array_data).astype(np.dtype('int')))
172+
with tempfile.TemporaryFile() as f:
173+
write_spmatrix_to_sparse_tensor(f, array)
174+
f.seek(0)
175+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
176+
record = Record()
177+
record.ParseFromString(record_data)
178+
assert record.features["values"].int32_tensor.values == expected_data
179+
assert record.features["values"].int32_tensor.keys == expected_keys
180+
assert record.features["values"].int32_tensor.shape == [len(expected_data)]
181+
182+
183+
def test_dense_int_spmatrix_to_sparse_label():
184+
array_data = [[1, 2, 3], [10, 20, 3]]
185+
keys_data = [[0, 1, 2], [0, 1, 2]]
186+
array = coo_matrix(np.array(array_data))
187+
label_data = np.array([99, 98, 97])
188+
with tempfile.TemporaryFile() as f:
189+
write_spmatrix_to_sparse_tensor(f, array, label_data)
190+
f.seek(0)
191+
for record_data, expected_data, expected_keys, label in zip(
192+
_read_recordio(f),
193+
array_data,
194+
keys_data,
195+
label_data
196+
):
197+
record = Record()
198+
record.ParseFromString(record_data)
199+
assert record.features["values"].int32_tensor.values == expected_data
200+
assert record.features["values"].int32_tensor.keys == expected_keys
201+
assert record.label["values"].int32_tensor.values == [label]
202+
assert record.features["values"].int32_tensor.shape == [len(expected_data)]
203+
204+
205+
def test_dense_float32_spmatrix_to_sparse_label():
206+
array_data = [[1, 2, 3], [10, 20, 3]]
207+
keys_data = [[0, 1, 2], [0, 1, 2]]
208+
array = coo_matrix(np.array(array_data).astype('float32'))
209+
label_data = np.array([99, 98, 97])
210+
with tempfile.TemporaryFile() as f:
211+
write_spmatrix_to_sparse_tensor(f, array, label_data)
212+
f.seek(0)
213+
for record_data, expected_data, expected_keys, label in zip(
214+
_read_recordio(f),
215+
array_data,
216+
keys_data,
217+
label_data
218+
):
219+
record = Record()
220+
record.ParseFromString(record_data)
221+
assert record.features["values"].float32_tensor.values == expected_data
222+
assert record.features["values"].float32_tensor.keys == expected_keys
223+
assert record.label["values"].int32_tensor.values == [label]
224+
assert record.features["values"].float32_tensor.shape == [len(expected_data)]
225+
226+
227+
def test_dense_float64_spmatrix_to_sparse_label():
228+
array_data = [[1, 2, 3], [10, 20, 3]]
229+
keys_data = [[0, 1, 2], [0, 1, 2]]
230+
array = coo_matrix(np.array(array_data).astype('float64'))
231+
label_data = np.array([99, 98, 97])
232+
with tempfile.TemporaryFile() as f:
233+
write_spmatrix_to_sparse_tensor(f, array, label_data)
234+
f.seek(0)
235+
for record_data, expected_data, expected_keys, label in zip(
236+
_read_recordio(f),
237+
array_data,
238+
keys_data,
239+
label_data
240+
):
241+
record = Record()
242+
record.ParseFromString(record_data)
243+
assert record.features["values"].float64_tensor.values == expected_data
244+
assert record.features["values"].float64_tensor.keys == expected_keys
245+
assert record.label["values"].int32_tensor.values == [label]
246+
assert record.features["values"].float64_tensor.shape == [len(expected_data)]
247+
248+
249+
def test_invalid_sparse_label():
250+
array_data = [[1, 2, 3], [10, 20, 3]]
251+
array = coo_matrix(np.array(array_data))
252+
label_data = np.array([99, 98, 97, 1000]).astype(np.dtype('float64'))
253+
with tempfile.TemporaryFile() as f:
254+
with pytest.raises(ValueError):
255+
write_spmatrix_to_sparse_tensor(f, array, label_data)
256+
257+
258+
def test_sparse_float_write_spmatrix_to_sparse_tensor():
259+
n = 4
260+
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
261+
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]
262+
263+
flatten_data = list(itertools.chain.from_iterable(array_data))
264+
y_indices = list(itertools.chain.from_iterable(keys_data))
265+
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
266+
x_indices = list(itertools.chain.from_iterable(x_indices))
267+
268+
array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float64')
269+
with tempfile.TemporaryFile() as f:
270+
write_spmatrix_to_sparse_tensor(f, array)
271+
f.seek(0)
272+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
273+
record = Record()
274+
record.ParseFromString(record_data)
275+
assert record.features["values"].float64_tensor.values == expected_data
276+
assert record.features["values"].float64_tensor.keys == expected_keys
277+
assert record.features["values"].float64_tensor.shape == [n]
278+
279+
280+
def test_sparse_float32_write_spmatrix_to_sparse_tensor():
281+
n = 4
282+
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
283+
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]
284+
285+
flatten_data = list(itertools.chain.from_iterable(array_data))
286+
y_indices = list(itertools.chain.from_iterable(keys_data))
287+
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
288+
x_indices = list(itertools.chain.from_iterable(x_indices))
289+
290+
array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='float32')
291+
with tempfile.TemporaryFile() as f:
292+
write_spmatrix_to_sparse_tensor(f, array)
293+
f.seek(0)
294+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
295+
record = Record()
296+
record.ParseFromString(record_data)
297+
assert record.features["values"].float32_tensor.values == expected_data
298+
assert record.features["values"].float32_tensor.keys == expected_keys
299+
assert record.features["values"].float32_tensor.shape == [n]
300+
301+
302+
def test_sparse_int_write_spmatrix_to_sparse_tensor():
303+
n = 4
304+
array_data = [[1.0, 2.0], [10.0, 30.0], [100.0, 200.0, 300.0, 400.0], [1000.0, 2000.0, 3000.0]]
305+
keys_data = [[0, 1], [1, 2], [0, 1, 2, 3], [0, 2, 3]]
306+
307+
flatten_data = list(itertools.chain.from_iterable(array_data))
308+
y_indices = list(itertools.chain.from_iterable(keys_data))
309+
x_indices = [[i] * len(keys_data[i]) for i in range(len(keys_data))]
310+
x_indices = list(itertools.chain.from_iterable(x_indices))
311+
312+
array = coo_matrix((flatten_data, (x_indices, y_indices)), dtype='int')
313+
with tempfile.TemporaryFile() as f:
314+
write_spmatrix_to_sparse_tensor(f, array)
315+
f.seek(0)
316+
for record_data, expected_data, expected_keys in zip(_read_recordio(f), array_data, keys_data):
317+
record = Record()
318+
record.ParseFromString(record_data)
319+
assert record.features["values"].int32_tensor.values == expected_data
320+
assert record.features["values"].int32_tensor.keys == expected_keys
321+
assert record.features["values"].int32_tensor.shape == [n]
322+
323+
324+
def test_dense_to_sparse():
325+
array_data = [[1, 2, 3], [10, 20, 3]]
326+
array = np.array(array_data)
327+
label_data = np.array([99, 98, 97]).astype(np.dtype('float64'))
328+
with tempfile.TemporaryFile() as f:
329+
with pytest.raises(TypeError):
330+
write_spmatrix_to_sparse_tensor(f, array, label_data)

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ exclude =
2121
*pb2.py
2222
.tox
2323
tests/data/
24+
venv/
2425
max-complexity = 10
2526

2627
[testenv]

0 commit comments

Comments
 (0)