Skip to content

Commit 5406fc3

Browse files
authored
INTPYTHON-165 Refactor nested data handling (#245)
1 parent 8fcc9ba commit 5406fc3

File tree

14 files changed

+811
-1119
lines changed

14 files changed

+811
-1119
lines changed

.github/workflows/benchmark.yml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ jobs:
5151
tox -e benchmark -- --set-commit-hash $(git rev-parse HEAD)
5252
}
5353
54-
pip install asv
54+
pip install asv virtualenv
5555
asv machine --yes
5656
git fetch origin main:main
5757
git update-ref refs/bm/pr HEAD
@@ -66,7 +66,8 @@ jobs:
6666
6767
- name: Compare benchmarks
6868
run: |
69-
asv compare refs/bm/merge-target refs/bm/pr --
69+
asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr --
70+
7071
- name: Fail if any benchmarks have slowed down too much
7172
run: |
72-
! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr | grep -q "got worse"
73+
! asv compare --factor 1.2 --split refs/bm/merge-target refs/bm/pr 2> /dev/null | grep -q "got worse"

.github/workflows/release-python.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ jobs:
123123
export LIBBSON_INSTALL_DIR="$(pwd)/libbson"
124124
python -m pip install dist/*.gz
125125
cd ..
126-
python -c "from pymongoarrow.lib import process_bson_stream"
126+
python -c "from pymongoarrow.lib import libbson_version"
127127
128128
- uses: actions/upload-artifact@v4
129129
with:

.pre-commit-config.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ repos:
9494
stages: [manual]
9595
args: ["--no-strict-imports"]
9696

97+
- repo: https://github.com/MarcoGorelli/cython-lint
98+
rev: v0.16.2
99+
hooks:
100+
- id: cython-lint
101+
args: ["--no-pycodestyle"]
102+
97103
- repo: https://github.com/codespell-project/codespell
98104
rev: "v2.2.6"
99105
hooks:

bindings/python/asv.conf.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,6 @@
1010
"N_DOCS": ["20000", "1000"]
1111
}
1212
},
13-
"environment_type": "virtualenv"
13+
"environment_type": "virtualenv",
14+
"plugins": ["virtualenv"]
1415
}

bindings/python/benchmarks/benchmarks.py

Lines changed: 16 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -114,11 +114,6 @@ class Read(ABC):
114114
def setup(self):
115115
raise NotImplementedError
116116

117-
# We need this because the naive methods don't always convert nested objects.
118-
@staticmethod # noqa: B027
119-
def exercise_table(table):
120-
pass
121-
122117
def time_conventional_ndarray(self):
123118
collection = db.benchmark
124119
cursor = collection.find(projection={"_id": 0})
@@ -147,13 +142,11 @@ def time_to_pandas(self):
147142
def time_conventional_arrow(self):
148143
c = db.benchmark
149144
f = list(c.find({}, projection={"_id": 0}))
150-
table = pa.Table.from_pylist(f)
151-
self.exercise_table(table)
145+
pa.Table.from_pylist(f)
152146

153147
def time_to_arrow(self):
154148
c = db.benchmark
155-
table = find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0})
156-
self.exercise_table(table)
149+
find_arrow_all(c, {}, schema=self.schema, projection={"_id": 0})
157150

158151
def time_conventional_polars(self):
159152
collection = db.benchmark
@@ -211,27 +204,25 @@ def setup(self):
211204
% (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict))
212205
)
213206

214-
# We need this because the naive methods don't always convert nested objects.
215-
@staticmethod
216-
def exercise_table(table):
217-
[
218-
[[n for n in i.values] if isinstance(i, pa.ListScalar) else i for i in column]
219-
for column in table.columns
220-
]
221-
222-
# All of the following tests are being skipped because NumPy/Pandas do not work with nested arrays.
207+
# All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested arrays.
223208
def time_to_numpy(self):
224209
pass
225210

226211
def time_to_pandas(self):
227212
pass
228213

214+
def time_to_polars(self):
215+
pass
216+
229217
def time_conventional_ndarray(self):
230218
pass
231219

232220
def time_conventional_pandas(self):
233221
pass
234222

223+
def time_conventional_polars(self):
224+
pass
225+
235226

236227
class ProfileReadDocument(Read):
237228
schema = Schema(
@@ -260,27 +251,25 @@ def setup(self):
260251
% (N_DOCS, len(BSON.encode(base_dict)) // 1024, len(base_dict))
261252
)
262253

263-
# We need this because the naive methods don't always convert nested objects.
264-
@staticmethod
265-
def exercise_table(table):
266-
[
267-
[[n for n in i.values()] if isinstance(i, pa.StructScalar) else i for i in column]
268-
for column in table.columns
269-
]
270-
271-
# All of the following tests are being skipped because NumPy/Pandas do not work with nested documents.
254+
# All of the following tests are being skipped because NumPy/Pandas/Polars do not work with nested documents.
272255
def time_to_numpy(self):
273256
pass
274257

275258
def time_to_pandas(self):
276259
pass
277260

261+
def time_to_polars(self):
262+
pass
263+
278264
def time_conventional_ndarray(self):
279265
pass
280266

281267
def time_conventional_pandas(self):
282268
pass
283269

270+
def time_conventional_polars(self):
271+
pass
272+
284273

285274
class ProfileReadSmall(Read):
286275
schema = Schema({"x": pa.int64(), "y": pa.float64()})

bindings/python/pymongoarrow/api.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,6 @@
3838
from pymongoarrow.schema import Schema
3939
from pymongoarrow.types import _validate_schema, get_numpy_type
4040

41-
try: # noqa: SIM105
42-
from pymongoarrow.lib import process_bson_stream
43-
except ImportError:
44-
pass
45-
4641
__all__ = [
4742
"aggregate_arrow_all",
4843
"find_arrow_all",
@@ -93,7 +88,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs):
9388
:Returns:
9489
An instance of class:`pyarrow.Table`.
9590
"""
96-
context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options)
91+
context = PyMongoArrowContext(schema, codec_options=collection.codec_options)
9792

9893
for opt in ("cursor_type",):
9994
if kwargs.pop(opt, None):
@@ -108,7 +103,7 @@ def find_arrow_all(collection, query, *, schema=None, **kwargs):
108103

109104
raw_batch_cursor = collection.find_raw_batches(query, **kwargs)
110105
for batch in raw_batch_cursor:
111-
process_bson_stream(batch, context)
106+
context.process_bson_stream(batch)
112107

113108
return context.finish()
114109

@@ -131,7 +126,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs):
131126
:Returns:
132127
An instance of class:`pyarrow.Table`.
133128
"""
134-
context = PyMongoArrowContext.from_schema(schema, codec_options=collection.codec_options)
129+
context = PyMongoArrowContext(schema, codec_options=collection.codec_options)
135130

136131
if pipeline and ("$out" in pipeline[-1] or "$merge" in pipeline[-1]):
137132
msg = (
@@ -152,7 +147,7 @@ def aggregate_arrow_all(collection, pipeline, *, schema=None, **kwargs):
152147

153148
raw_batch_cursor = collection.aggregate_raw_batches(pipeline, **kwargs)
154149
for batch in raw_batch_cursor:
155-
process_bson_stream(batch, context)
150+
context.process_bson_stream(batch)
156151

157152
return context.finish()
158153

bindings/python/pymongoarrow/context.py

Lines changed: 64 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -11,55 +11,15 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
from bson.codec_options import DEFAULT_CODEC_OPTIONS
15-
from pyarrow import Table, timestamp
14+
from pyarrow import ListArray, StructArray, Table
1615

1716
from pymongoarrow.types import _BsonArrowTypes, _get_internal_typemap
1817

19-
try:
20-
from pymongoarrow.lib import (
21-
BinaryBuilder,
22-
BoolBuilder,
23-
CodeBuilder,
24-
Date32Builder,
25-
Date64Builder,
26-
DatetimeBuilder,
27-
Decimal128Builder,
28-
DocumentBuilder,
29-
DoubleBuilder,
30-
Int32Builder,
31-
Int64Builder,
32-
ListBuilder,
33-
NullBuilder,
34-
ObjectIdBuilder,
35-
StringBuilder,
36-
)
37-
38-
_TYPE_TO_BUILDER_CLS = {
39-
_BsonArrowTypes.int32: Int32Builder,
40-
_BsonArrowTypes.int64: Int64Builder,
41-
_BsonArrowTypes.double: DoubleBuilder,
42-
_BsonArrowTypes.datetime: DatetimeBuilder,
43-
_BsonArrowTypes.objectid: ObjectIdBuilder,
44-
_BsonArrowTypes.decimal128: Decimal128Builder,
45-
_BsonArrowTypes.string: StringBuilder,
46-
_BsonArrowTypes.bool: BoolBuilder,
47-
_BsonArrowTypes.document: DocumentBuilder,
48-
_BsonArrowTypes.array: ListBuilder,
49-
_BsonArrowTypes.binary: BinaryBuilder,
50-
_BsonArrowTypes.code: CodeBuilder,
51-
_BsonArrowTypes.date32: Date32Builder,
52-
_BsonArrowTypes.date64: Date64Builder,
53-
_BsonArrowTypes.null: NullBuilder,
54-
}
55-
except ImportError:
56-
pass
57-
5818

5919
class PyMongoArrowContext:
6020
"""A context for converting BSON-formatted data to an Arrow Table."""
6121

62-
def __init__(self, schema, builder_map, codec_options=None):
22+
def __init__(self, schema, codec_options=None):
6323
"""Initialize the context.
6424
6525
:Parameters:
@@ -68,57 +28,75 @@ def __init__(self, schema, builder_map, codec_options=None):
6828
:class:`~pymongoarrow.builders._BuilderBase` instances.
6929
"""
7030
self.schema = schema
71-
self.builder_map = builder_map
7231
if self.schema is None and codec_options is not None:
7332
self.tzinfo = codec_options.tzinfo
7433
else:
7534
self.tzinfo = None
35+
schema_map = {}
36+
if self.schema is not None:
37+
str_type_map = _get_internal_typemap(schema.typemap)
38+
_parse_types(str_type_map, schema_map, self.tzinfo)
7639

77-
@classmethod
78-
def from_schema(cls, schema, codec_options=DEFAULT_CODEC_OPTIONS):
79-
"""Initialize the context from a :class:`~pymongoarrow.schema.Schema`
80-
instance.
40+
# Delayed import to prevent import errors for unbuilt library.
41+
from pymongoarrow.lib import BuilderManager
8142

82-
:Parameters:
83-
- `schema`: Instance of :class:`~pymongoarrow.schema.Schema`.
84-
- `codec_options` (optional): An instance of
85-
:class:`~bson.codec_options.CodecOptions`.
86-
"""
87-
if schema is None:
88-
return cls(schema, {}, codec_options)
89-
90-
builder_map = {}
91-
tzinfo = codec_options.tzinfo
92-
str_type_map = _get_internal_typemap(schema.typemap)
93-
for fname, ftype in str_type_map.items():
94-
builder_cls = _TYPE_TO_BUILDER_CLS[ftype]
95-
encoded_fname = fname.encode("utf-8")
96-
97-
# special-case initializing builders for parameterized types
98-
if builder_cls == DatetimeBuilder:
99-
arrow_type = schema.typemap[fname]
100-
if tzinfo is not None and arrow_type.tz is None:
101-
arrow_type = timestamp(arrow_type.unit, tz=tzinfo)
102-
builder_map[encoded_fname] = DatetimeBuilder(dtype=arrow_type)
103-
elif builder_cls == DocumentBuilder:
104-
arrow_type = schema.typemap[fname]
105-
builder_map[encoded_fname] = DocumentBuilder(arrow_type, tzinfo)
106-
elif builder_cls == ListBuilder:
107-
arrow_type = schema.typemap[fname]
108-
builder_map[encoded_fname] = ListBuilder(arrow_type, tzinfo)
109-
elif builder_cls == BinaryBuilder:
110-
subtype = schema.typemap[fname].subtype
111-
builder_map[encoded_fname] = BinaryBuilder(subtype)
112-
else:
113-
builder_map[encoded_fname] = builder_cls()
114-
return cls(schema, builder_map)
43+
self.manager = BuilderManager(schema_map, self.schema is not None, self.tzinfo)
44+
45+
def process_bson_stream(self, stream):
46+
self.manager.process_bson_stream(stream, len(stream))
11547

11648
def finish(self):
117-
arrays = []
118-
names = []
119-
for fname, builder in self.builder_map.items():
120-
arrays.append(builder.finish())
121-
names.append(fname.decode("utf-8"))
49+
array_map = _parse_builder_map(self.manager.finish())
50+
arrays = list(array_map.values())
12251
if self.schema is not None:
12352
return Table.from_arrays(arrays=arrays, schema=self.schema.to_arrow())
124-
return Table.from_arrays(arrays=arrays, names=names)
53+
return Table.from_arrays(arrays=arrays, names=list(array_map.keys()))
54+
55+
56+
def _parse_builder_map(builder_map):
57+
# Handle nested builders.
58+
to_remove = []
59+
# Traverse the builder map right to left.
60+
for key, value in reversed(builder_map.items()):
61+
if value.type_marker == _BsonArrowTypes.document.value:
62+
names = value.finish()
63+
full_names = [f"{key}.{name}" for name in names]
64+
arrs = [builder_map[c] for c in full_names]
65+
builder_map[key] = StructArray.from_arrays(arrs, names=names)
66+
to_remove.extend(full_names)
67+
elif value.type_marker == _BsonArrowTypes.array.value:
68+
child_name = key + "[]"
69+
to_remove.append(child_name)
70+
child = builder_map[child_name]
71+
builder_map[key] = ListArray.from_arrays(value.finish(), child)
72+
else:
73+
builder_map[key] = value.finish()
74+
75+
for key in to_remove:
76+
if key in builder_map:
77+
del builder_map[key]
78+
79+
return builder_map
80+
81+
82+
def _parse_types(str_type_map, schema_map, tzinfo):
83+
for fname, (ftype, arrow_type) in str_type_map.items():
84+
schema_map[fname] = ftype, arrow_type
85+
86+
# special-case nested builders
87+
if ftype == _BsonArrowTypes.document.value:
88+
# construct a sub type map here
89+
sub_type_map = {}
90+
for i in range(arrow_type.num_fields):
91+
field = arrow_type[i]
92+
sub_name = f"{fname}.{field.name}"
93+
sub_type_map[sub_name] = field.type
94+
sub_type_map = _get_internal_typemap(sub_type_map)
95+
_parse_types(sub_type_map, schema_map, tzinfo)
96+
elif ftype == _BsonArrowTypes.array.value:
97+
sub_type_map = {}
98+
sub_name = f"{fname}[]"
99+
sub_value_type = arrow_type.value_type
100+
sub_type_map[sub_name] = sub_value_type
101+
sub_type_map = _get_internal_typemap(sub_type_map)
102+
_parse_types(sub_type_map, schema_map, tzinfo)

0 commit comments

Comments
 (0)