Skip to content

Commit a101383

Browse files
committed
support json type
1 parent 2e6331e commit a101383

File tree

13 files changed

+345
-125
lines changed

13 files changed

+345
-125
lines changed

README.rst

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,4 +143,63 @@ Big fan of Pandas? We too! You can mix SQL and Pandas API together. Also you can
143143
print(cur.pl()) # Polars DataFrame
144144
145145
cur.execute('SELECT * FROM table(test)')
146-
print(cur.arrow()) # Arrow Table
146+
print(cur.arrow()) # Arrow Table
147+
148+
Json Type
149+
------------
150+
You can control named tuple and nested types deserialization by setting ``namedtuple_as_json``.To interpret these columns (i.e. named tuple, nested) as a json set ``namedtuple_as_json`` to ``True``
151+
152+
.. code-block:: python
153+
154+
from proton_driver import client
155+
import time
156+
157+
if __name__ == "__main__":
158+
c = client.Client(host='127.0.0.1', port=8463)
159+
160+
c.execute("DROP STREAM IF EXISTS test")
161+
c.execute("""CREATE STREAM test (
162+
j json,
163+
t tuple(code int,data tuple(id int,price float)),
164+
n nested(name string, locations array(string))
165+
)""", settings=dict(flatten_nested=0))
166+
167+
# add some data
168+
data = [(
169+
{
170+
"id": 341436,
171+
"info": {
172+
"name": "timeplus",
173+
"timezone": "Asia/Shanghai",
174+
},
175+
"arr": [43, 65, 87, 456, 356],
176+
},
177+
(200, (341436, 699.98)),
178+
[("proton", ["Ningbo", "Hangzhou", "Shanghai", "Beijing"])],
179+
)]
180+
c.execute("INSERT INTO test (j, t, n) VALUES", data)
181+
182+
# wait for 3 sec to make sure data available in historical store
183+
time.sleep(3)
184+
185+
# proton return json type as a named tuple by default.
186+
rv = c.execute(
187+
"SELECT j, t, n FROM table(test)",
188+
)
189+
190+
rv_as_json = c.execute(
191+
"SELECT j, t, n FROM table(test)",
192+
# Set namedtuple_as_json to True,to convert named tuple and nested types to json
193+
settings=dict(namedtuple_as_json=True)
194+
)
195+
196+
for record_as_json, record_as_tuple in zip(rv_as_json, rv):
197+
print(f"colnmu j:\n - json: {record_as_json[0]}\n - tuple: {record_as_tuple[0]}")
198+
print(f"colnmu t:\n - json: {record_as_json[1]}\n - tuple: {record_as_tuple[1]}")
199+
print(f"colnmu n:\n - json: {record_as_json[2]}\n - tuple: {record_as_tuple[2]}")
200+
201+
# For json type, you can cast it to json in SQL statement without namedtuple_as_json setting.
202+
rv_json = c.execute(
203+
"SELECT j::json FROM table(test)",
204+
)
205+
print(rv_json)

proton_driver/block.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .reader import read_varint, read_binary_uint8, read_binary_int32
22
from .varint import write_varint
33
from .writer import write_binary_uint8, write_binary_int32
4-
from .columns import nestedcolumn
4+
from .columns.util import get_inner_columns_with_types
55

66

77
class BlockInfo(object):
@@ -172,7 +172,7 @@ def _pure_mutate_dicts_to_rows(
172172
for name, type_ in columns_with_types:
173173
cwt = None
174174
if type_.startswith('nested'):
175-
cwt = nestedcolumn.get_columns_with_types(type_)
175+
cwt = get_inner_columns_with_types('nested', type_)
176176
columns_with_cwt.append((name, cwt))
177177

178178
for i, row in enumerate(data):

proton_driver/client.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ class Client(object):
4949
* ``quota_key`` -- A string to differentiate quotas when the user have
5050
keyed quotas configured on server.
5151
New in version *0.2.3*.
52+
* ``namedtuple_as_json`` -- Controls named tuple and nested types
53+
deserialization. To interpret these column as
54+
Python tuple set ``namedtuple_as_json``
55+
to ``False``. Default: False.
56+
New in version *0.2.12*.
5257
"""
5358

5459
available_client_settings = (
@@ -58,7 +63,8 @@ class Client(object):
5863
'use_numpy',
5964
'opentelemetry_traceparent',
6065
'opentelemetry_tracestate',
61-
'quota_key'
66+
'quota_key',
67+
'namedtuple_as_json'
6268
)
6369

6470
def __init__(self, *args, **kwargs):
@@ -85,6 +91,9 @@ def __init__(self, *args, **kwargs):
8591
),
8692
'quota_key': self.settings.pop(
8793
'quota_key', ''
94+
),
95+
'namedtuple_as_json': self.settings.pop(
96+
'namedtuple_as_json', False
8897
)
8998
}
9099

proton_driver/columns/arraycolumn.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,31 @@ class ArrayColumn(Column):
2828
py_types = (list, tuple)
2929

3030
def __init__(self, nested_column, **kwargs):
31-
self.size_column = UInt64Column()
31+
self.init_kwargs = kwargs
32+
self.size_column = UInt64Column(**kwargs)
3233
self.nested_column = nested_column
3334
self._write_depth_0_size = True
3435
super(ArrayColumn, self).__init__(**kwargs)
36+
self.null_value = []
3537

3638
def write_data(self, data, buf):
3739
# Column of Array(T) is stored in "compact" format and passed to server
3840
# wrapped into another Array without size of wrapper array.
39-
self.nested_column = ArrayColumn(self.nested_column)
41+
self.nested_column = ArrayColumn(
42+
self.nested_column, **self.init_kwargs
43+
)
4044
self.nested_column.nullable = self.nullable
4145
self.nullable = False
4246
self._write_depth_0_size = False
4347
self._write(data, buf)
4448

45-
def read_data(self, rows, buf):
46-
self.nested_column = ArrayColumn(self.nested_column)
49+
def read_data(self, n_rows, buf):
50+
self.nested_column = ArrayColumn(
51+
self.nested_column, **self.init_kwargs
52+
)
4753
self.nested_column.nullable = self.nullable
4854
self.nullable = False
49-
return self._read(rows, buf)[0]
55+
return self._read(n_rows, buf)[0]
5056

5157
def _write_sizes(self, value, buf):
5258
nulls_map = []
@@ -99,14 +105,19 @@ def _write_nulls_data(self, value, buf):
99105
self.nested_column._write_nulls_map(value, buf)
100106

101107
def _write(self, value, buf):
108+
value = self.prepare_items(value)
102109
self._write_sizes(value, buf)
103110
self._write_nulls_data(value, buf)
104111
self._write_data(value, buf)
105112

106113
def read_state_prefix(self, buf):
107-
return self.nested_column.read_state_prefix(buf)
114+
super(ArrayColumn, self).read_state_prefix(buf)
115+
116+
self.nested_column.read_state_prefix(buf)
108117

109118
def write_state_prefix(self, buf):
119+
super(ArrayColumn, self).write_state_prefix(buf)
120+
110121
self.nested_column.write_state_prefix(buf)
111122

112123
def _read(self, size, buf):
@@ -145,6 +156,6 @@ def _read(self, size, buf):
145156
return tuple(data)
146157

147158

148-
def create_array_column(spec, column_by_spec_getter):
159+
def create_array_column(spec, column_by_spec_getter, column_options):
149160
inner = spec[6:-1]
150-
return ArrayColumn(column_by_spec_getter(inner))
161+
return ArrayColumn(column_by_spec_getter(inner), **column_options)

proton_driver/columns/jsoncolumn.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from .base import Column
2+
from .stringcolumn import String
3+
from ..reader import read_binary_uint8, read_binary_str
4+
from ..util.compat import json
5+
from ..writer import write_binary_uint8
6+
7+
8+
class JsonColumn(Column):
9+
py_types = (dict, )
10+
11+
# No NULL value actually
12+
null_value = {}
13+
14+
def __init__(self, column_by_spec_getter, **kwargs):
15+
self.column_by_spec_getter = column_by_spec_getter
16+
self.string_column = String(**kwargs)
17+
super(JsonColumn, self).__init__(**kwargs)
18+
19+
def write_state_prefix(self, buf):
20+
# Read in binary format.
21+
# Write in text format.
22+
write_binary_uint8(1, buf)
23+
24+
def read_items(self, n_items, buf):
25+
read_binary_uint8(buf)
26+
spec = read_binary_str(buf)
27+
col = self.column_by_spec_getter(
28+
spec, dict(namedtuple_as_json=True)
29+
)
30+
col.read_state_prefix(buf)
31+
return col.read_data(n_items, buf)
32+
33+
def write_items(self, items, buf):
34+
items = [x if isinstance(x, str) else json.dumps(x) for x in items]
35+
self.string_column.write_items(items, buf)
36+
37+
38+
def create_json_column(spec, column_by_spec_getter, column_options):
39+
return JsonColumn(column_by_spec_getter, **column_options)

proton_driver/columns/nestedcolumn.py

Lines changed: 4 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,73 +1,10 @@
11

22
from .arraycolumn import create_array_column
3+
from .util import get_inner_spec
34

45

5-
def create_nested_column(spec, column_by_spec_getter):
6+
def create_nested_column(spec, column_by_spec_getter, column_options):
67
return create_array_column(
7-
'array(tuple({}))'.format(','.join(get_nested_columns(spec))),
8-
column_by_spec_getter=column_by_spec_getter
8+
'array(tuple({}))'.format(get_inner_spec('nested', spec)),
9+
column_by_spec_getter, column_options
910
)
10-
11-
12-
def get_nested_columns(spec):
13-
brackets = 0
14-
column_begin = 0
15-
16-
inner_spec = get_inner_spec(spec)
17-
nested_columns = []
18-
for i, x in enumerate(inner_spec + ','):
19-
if x == ',':
20-
if brackets == 0:
21-
nested_columns.append(inner_spec[column_begin:i])
22-
column_begin = i + 1
23-
elif x == '(':
24-
brackets += 1
25-
elif x == ')':
26-
brackets -= 1
27-
elif x == ' ':
28-
if brackets == 0:
29-
column_begin = i + 1
30-
return nested_columns
31-
32-
33-
def get_columns_with_types(spec):
34-
brackets = 0
35-
prev_comma = 0
36-
prev_space = 0
37-
38-
inner_spec = get_inner_spec(spec)
39-
columns_with_types = []
40-
41-
for i, x in enumerate(inner_spec + ','):
42-
if x == ',':
43-
if brackets == 0:
44-
columns_with_types.append((
45-
inner_spec[prev_comma:prev_space].strip(),
46-
inner_spec[prev_space:i]
47-
))
48-
prev_comma = i + 1
49-
elif x == '(':
50-
brackets += 1
51-
elif x == ')':
52-
brackets -= 1
53-
elif x == ' ':
54-
if brackets == 0:
55-
prev_space = i + 1
56-
return columns_with_types
57-
58-
59-
def get_inner_spec(spec):
60-
brackets = 0
61-
offset = len('nested')
62-
i = offset
63-
for i, ch in enumerate(spec[offset:], offset):
64-
if ch == '(':
65-
brackets += 1
66-
67-
elif ch == ')':
68-
brackets -= 1
69-
70-
if brackets == 0:
71-
break
72-
73-
return spec[offset + 1:i]

proton_driver/columns/service.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
IntervalSecondColumn
3333
)
3434
from .ipcolumn import IPv4Column, IPv6Column
35+
from .jsoncolumn import create_json_column
3536

3637

3738
column_by_type = {c.ch_type: c for c in [
@@ -64,7 +65,11 @@ def get_column_by_spec(spec, column_options, use_numpy=None):
6465
logger.warning('NumPy support is not implemented for %s. '
6566
'Using generic column', spec)
6667

67-
def create_column_with_options(x):
68+
def create_column_with_options(x, settings=None):
69+
if settings:
70+
client_settings = column_options['context'].client_settings
71+
client_settings.update(settings)
72+
column_options['context'].client_settings = client_settings
6873
return get_column_by_spec(x, column_options, use_numpy=use_numpy)
6974

7075
if spec == 'string' or spec.startswith('fixed_string'):
@@ -80,13 +85,23 @@ def create_column_with_options(x):
8085
return create_decimal_column(spec, column_options)
8186

8287
elif spec.startswith('array'):
83-
return create_array_column(spec, create_column_with_options)
88+
return create_array_column(
89+
spec, create_column_with_options, column_options
90+
)
8491

8592
elif spec.startswith('tuple'):
86-
return create_tuple_column(spec, create_column_with_options)
93+
return create_tuple_column(
94+
spec, create_column_with_options, column_options
95+
)
96+
elif spec.startswith('json'):
97+
return create_json_column(
98+
spec, create_column_with_options, column_options
99+
)
87100

88101
elif spec.startswith('nested'):
89-
return create_nested_column(spec, create_column_with_options)
102+
return create_nested_column(
103+
spec, create_column_with_options, column_options
104+
)
90105

91106
elif spec.startswith('nullable'):
92107
return create_nullable_column(spec, create_column_with_options)

0 commit comments

Comments
 (0)