|
| 1 | +import struct |
| 2 | +import io |
| 3 | +import boto3 |
| 4 | +import sys |
| 5 | + |
| 6 | +import AmazonAIAlgorithmsIO_pb2 |
| 7 | +from record_pb2 import Record |
| 8 | + |
| 9 | + |
| 10 | +def write_recordio(f, data): |
| 11 | + kmagic = 0xced7230a |
| 12 | + length = len(data) |
| 13 | + f.write(struct.pack('I', kmagic)) |
| 14 | + f.write(struct.pack('I', length)) |
| 15 | + upper_align = ((length + 3) >> 2) << 2 |
| 16 | + padding = bytes([0x00 for _ in range(upper_align - length)]) |
| 17 | + f.write(data) |
| 18 | + f.write(padding) |
| 19 | + |
| 20 | + |
| 21 | +def list_to_record_bytes(values, keys=None, label=None, feature_size=None): |
| 22 | + record = Record() |
| 23 | + |
| 24 | + record.features['values'].float32_tensor.values.extend(values) |
| 25 | + |
| 26 | + if keys is not None: |
| 27 | + if feature_size is None: |
| 28 | + raise ValueError("For sparse tensors the feature size must be specified.") |
| 29 | + |
| 30 | + record.features['values'].float32_tensor.keys.extend(keys) |
| 31 | + |
| 32 | + if feature_size is not None: |
| 33 | + record.features['values'].float32_tensor.shape.extend([feature_size]) |
| 34 | + |
| 35 | + if label is not None: |
| 36 | + record.label['values'].float32_tensor.values.extend([label]) |
| 37 | + |
| 38 | + return record.SerializeToString() |
| 39 | + |
| 40 | + |
| 41 | +def read_next(f): |
| 42 | + kmagic = 0xced7230a |
| 43 | + raw_bytes = f.read(4) |
| 44 | + if not raw_bytes: |
| 45 | + return |
| 46 | + m = struct.unpack('I', raw_bytes)[0] |
| 47 | + if m != kmagic: |
| 48 | + raise ValueError("Incorrect encoding") |
| 49 | + length = struct.unpack('I', f.read(4))[0] |
| 50 | + upper_align = ((length + 3) >> 2) << 2 |
| 51 | + data = f.read(upper_align) |
| 52 | + return data[:length] |
| 53 | + |
| 54 | + |
| 55 | +def to_proto(f, labels, vectors): |
| 56 | + for label, vec in zip(labels, vectors): |
| 57 | + record = AmazonAIAlgorithmsIO_pb2.Record() |
| 58 | + record.values.extend(vec) |
| 59 | + record.label = label |
| 60 | + write_recordio(f, record.SerializeToString()) |
| 61 | + |
| 62 | + |
| 63 | +def to_libsvm(f, labels, values): |
| 64 | + f.write('\n'.join( |
| 65 | + ['{} {}'.format(label, ' '.join(['{}:{}'.format(i + 1, el) for i, el in enumerate(vec)])) for label, vec in |
| 66 | + zip(labels, values)])) |
| 67 | + return f |
| 68 | + |
| 69 | + |
| 70 | +def write_to_s3(fobj, bucket, key): |
| 71 | + return boto3.Session().resource('s3').Bucket(bucket).Object(key).upload_fileobj(fobj) |
| 72 | + |
| 73 | + |
| 74 | +def upload_to_s3(partition_name, partition, bucket): |
| 75 | + labels = [t.tolist() for t in partition[1]] |
| 76 | + vectors = [t.tolist() for t in partition[0]] |
| 77 | + f = io.BytesIO() |
| 78 | + to_proto(f, labels, vectors) |
| 79 | + f.seek(0) |
| 80 | + key = "{}/examples".format(partition_name) |
| 81 | + url = 's3n://{}/{}'.format(bucket, key) |
| 82 | + print('Writing to {}'.format(url)) |
| 83 | + write_to_s3(f, bucket, key) |
| 84 | + print('Done writing to {}'.format(url)) |
| 85 | + |
| 86 | + |
| 87 | +def convert_data(partitions, bucket): |
| 88 | + for partition_name, partition in partitions: |
| 89 | + print('{}: {} {}'.format(partition_name, partition[0].shape, partition[1].shape)) |
| 90 | + upload_to_s3(partition_name, partition, bucket) |
0 commit comments