Skip to content

Commit 498a70a

Browse files
authored
Merge branch 'main' into trace-annotation-SessionPools
2 parents 5bf38b7 + ccae6e0 commit 498a70a

File tree

4 files changed

+260
-79
lines changed

4 files changed

+260
-79
lines changed

google/cloud/spanner_v1/_helpers.py

Lines changed: 131 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -266,66 +266,69 @@ def _parse_value_pb(value_pb, field_type, field_name, column_info=None):
266266
:returns: value extracted from value_pb
267267
:raises ValueError: if unknown type is passed
268268
"""
269+
decoder = _get_type_decoder(field_type, field_name, column_info)
270+
return _parse_nullable(value_pb, decoder)
271+
272+
273+
def _get_type_decoder(field_type, field_name, column_info=None):
274+
"""Returns a function that converts a Value protobuf to cell data.
275+
276+
:type field_type: :class:`~google.cloud.spanner_v1.types.Type`
277+
:param field_type: type code for the value
278+
279+
:type field_name: str
280+
:param field_name: column name
281+
282+
:type column_info: dict
283+
:param column_info: (Optional) dict of column name and column information.
284+
An object where column names as keys and custom objects as corresponding
285+
values for deserialization. It's specifically useful for data types like
286+
protobuf where deserialization logic is on user-specific code. When provided,
287+
the custom object enables deserialization of backend-received column data.
288+
If not provided, data remains serialized as bytes for Proto Messages and
289+
integer for Proto Enums.
290+
291+
:rtype: a function that takes a single protobuf value as an input argument
292+
:returns: a function that can be used to extract a value from a protobuf value
293+
:raises ValueError: if unknown type is passed
294+
"""
295+
269296
type_code = field_type.code
270-
if value_pb.HasField("null_value"):
271-
return None
272297
if type_code == TypeCode.STRING:
273-
return value_pb.string_value
298+
return _parse_string
274299
elif type_code == TypeCode.BYTES:
275-
return value_pb.string_value.encode("utf8")
300+
return _parse_bytes
276301
elif type_code == TypeCode.BOOL:
277-
return value_pb.bool_value
302+
return _parse_bool
278303
elif type_code == TypeCode.INT64:
279-
return int(value_pb.string_value)
304+
return _parse_int64
280305
elif type_code == TypeCode.FLOAT64:
281-
if value_pb.HasField("string_value"):
282-
return float(value_pb.string_value)
283-
else:
284-
return value_pb.number_value
306+
return _parse_float
285307
elif type_code == TypeCode.FLOAT32:
286-
if value_pb.HasField("string_value"):
287-
return float(value_pb.string_value)
288-
else:
289-
return value_pb.number_value
308+
return _parse_float
290309
elif type_code == TypeCode.DATE:
291-
return _date_from_iso8601_date(value_pb.string_value)
310+
return _parse_date
292311
elif type_code == TypeCode.TIMESTAMP:
293-
DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds
294-
return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value)
295-
elif type_code == TypeCode.ARRAY:
296-
return [
297-
_parse_value_pb(
298-
item_pb, field_type.array_element_type, field_name, column_info
299-
)
300-
for item_pb in value_pb.list_value.values
301-
]
302-
elif type_code == TypeCode.STRUCT:
303-
return [
304-
_parse_value_pb(
305-
item_pb, field_type.struct_type.fields[i].type_, field_name, column_info
306-
)
307-
for (i, item_pb) in enumerate(value_pb.list_value.values)
308-
]
312+
return _parse_timestamp
309313
elif type_code == TypeCode.NUMERIC:
310-
return decimal.Decimal(value_pb.string_value)
314+
return _parse_numeric
311315
elif type_code == TypeCode.JSON:
312-
return JsonObject.from_str(value_pb.string_value)
316+
return _parse_json
313317
elif type_code == TypeCode.PROTO:
314-
bytes_value = base64.b64decode(value_pb.string_value)
315-
if column_info is not None and column_info.get(field_name) is not None:
316-
default_proto_message = column_info.get(field_name)
317-
if isinstance(default_proto_message, Message):
318-
proto_message = type(default_proto_message)()
319-
proto_message.ParseFromString(bytes_value)
320-
return proto_message
321-
return bytes_value
318+
return lambda value_pb: _parse_proto(value_pb, column_info, field_name)
322319
elif type_code == TypeCode.ENUM:
323-
int_value = int(value_pb.string_value)
324-
if column_info is not None and column_info.get(field_name) is not None:
325-
proto_enum = column_info.get(field_name)
326-
if isinstance(proto_enum, EnumTypeWrapper):
327-
return proto_enum.Name(int_value)
328-
return int_value
320+
return lambda value_pb: _parse_proto_enum(value_pb, column_info, field_name)
321+
elif type_code == TypeCode.ARRAY:
322+
element_decoder = _get_type_decoder(
323+
field_type.array_element_type, field_name, column_info
324+
)
325+
return lambda value_pb: _parse_array(value_pb, element_decoder)
326+
elif type_code == TypeCode.STRUCT:
327+
element_decoders = [
328+
_get_type_decoder(item_field.type_, field_name, column_info)
329+
for item_field in field_type.struct_type.fields
330+
]
331+
return lambda value_pb: _parse_struct(value_pb, element_decoders)
329332
else:
330333
raise ValueError("Unknown type: %s" % (field_type,))
331334

@@ -351,6 +354,87 @@ def _parse_list_value_pbs(rows, row_type):
351354
return result
352355

353356

357+
def _parse_string(value_pb) -> str:
358+
return value_pb.string_value
359+
360+
361+
def _parse_bytes(value_pb):
362+
return value_pb.string_value.encode("utf8")
363+
364+
365+
def _parse_bool(value_pb) -> bool:
366+
return value_pb.bool_value
367+
368+
369+
def _parse_int64(value_pb) -> int:
370+
return int(value_pb.string_value)
371+
372+
373+
def _parse_float(value_pb) -> float:
374+
if value_pb.HasField("string_value"):
375+
return float(value_pb.string_value)
376+
else:
377+
return value_pb.number_value
378+
379+
380+
def _parse_date(value_pb):
381+
return _date_from_iso8601_date(value_pb.string_value)
382+
383+
384+
def _parse_timestamp(value_pb):
385+
DatetimeWithNanoseconds = datetime_helpers.DatetimeWithNanoseconds
386+
return DatetimeWithNanoseconds.from_rfc3339(value_pb.string_value)
387+
388+
389+
def _parse_numeric(value_pb):
390+
return decimal.Decimal(value_pb.string_value)
391+
392+
393+
def _parse_json(value_pb):
394+
return JsonObject.from_str(value_pb.string_value)
395+
396+
397+
def _parse_proto(value_pb, column_info, field_name):
398+
bytes_value = base64.b64decode(value_pb.string_value)
399+
if column_info is not None and column_info.get(field_name) is not None:
400+
default_proto_message = column_info.get(field_name)
401+
if isinstance(default_proto_message, Message):
402+
proto_message = type(default_proto_message)()
403+
proto_message.ParseFromString(bytes_value)
404+
return proto_message
405+
return bytes_value
406+
407+
408+
def _parse_proto_enum(value_pb, column_info, field_name):
409+
int_value = int(value_pb.string_value)
410+
if column_info is not None and column_info.get(field_name) is not None:
411+
proto_enum = column_info.get(field_name)
412+
if isinstance(proto_enum, EnumTypeWrapper):
413+
return proto_enum.Name(int_value)
414+
return int_value
415+
416+
417+
def _parse_array(value_pb, element_decoder) -> []:
418+
return [
419+
_parse_nullable(item_pb, element_decoder)
420+
for item_pb in value_pb.list_value.values
421+
]
422+
423+
424+
def _parse_struct(value_pb, element_decoders):
425+
return [
426+
_parse_nullable(item_pb, element_decoders[i])
427+
for (i, item_pb) in enumerate(value_pb.list_value.values)
428+
]
429+
430+
431+
def _parse_nullable(value_pb, decoder):
432+
if value_pb.HasField("null_value"):
433+
return None
434+
else:
435+
return decoder(value_pb)
436+
437+
354438
class _SessionWrapper(object):
355439
"""Base class for objects wrapping a session.
356440

google/cloud/spanner_v1/snapshot.py

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ def read(
192192
retry=gapic_v1.method.DEFAULT,
193193
timeout=gapic_v1.method.DEFAULT,
194194
column_info=None,
195+
lazy_decode=False,
195196
):
196197
"""Perform a ``StreamingRead`` API request for rows in a table.
197198
@@ -255,6 +256,18 @@ def read(
255256
If not provided, data remains serialized as bytes for Proto Messages and
256257
integer for Proto Enums.
257258
259+
:type lazy_decode: bool
260+
:param lazy_decode:
261+
(Optional) If this argument is set to ``true``, the iterator
262+
returns the underlying protobuf values instead of decoded Python
263+
objects. This reduces the time that is needed to iterate through
264+
large result sets. The application is responsible for decoding
265+
the data that is needed. The returned row iterator contains two
266+
functions that can be used for this. ``iterator.decode_row(row)``
267+
decodes all the columns in the given row to an array of Python
268+
objects. ``iterator.decode_column(row, column_index)`` decodes one
269+
specific column in the given row.
270+
258271
:rtype: :class:`~google.cloud.spanner_v1.streamed.StreamedResultSet`
259272
:returns: a result set instance which can be used to consume rows.
260273
@@ -330,10 +343,15 @@ def read(
330343
self._read_request_count += 1
331344
if self._multi_use:
332345
return StreamedResultSet(
333-
iterator, source=self, column_info=column_info
346+
iterator,
347+
source=self,
348+
column_info=column_info,
349+
lazy_decode=lazy_decode,
334350
)
335351
else:
336-
return StreamedResultSet(iterator, column_info=column_info)
352+
return StreamedResultSet(
353+
iterator, column_info=column_info, lazy_decode=lazy_decode
354+
)
337355
else:
338356
iterator = _restart_on_unavailable(
339357
restart,
@@ -348,9 +366,13 @@ def read(
348366
self._read_request_count += 1
349367

350368
if self._multi_use:
351-
return StreamedResultSet(iterator, source=self, column_info=column_info)
369+
return StreamedResultSet(
370+
iterator, source=self, column_info=column_info, lazy_decode=lazy_decode
371+
)
352372
else:
353-
return StreamedResultSet(iterator, column_info=column_info)
373+
return StreamedResultSet(
374+
iterator, column_info=column_info, lazy_decode=lazy_decode
375+
)
354376

355377
def execute_sql(
356378
self,
@@ -366,6 +388,7 @@ def execute_sql(
366388
data_boost_enabled=False,
367389
directed_read_options=None,
368390
column_info=None,
391+
lazy_decode=False,
369392
):
370393
"""Perform an ``ExecuteStreamingSql`` API request.
371394
@@ -438,6 +461,18 @@ def execute_sql(
438461
If not provided, data remains serialized as bytes for Proto Messages and
439462
integer for Proto Enums.
440463
464+
:type lazy_decode: bool
465+
:param lazy_decode:
466+
(Optional) If this argument is set to ``true``, the iterator
467+
returns the underlying protobuf values instead of decoded Python
468+
objects. This reduces the time that is needed to iterate through
469+
large result sets. The application is responsible for decoding
470+
the data that is needed. The returned row iterator contains two
471+
functions that can be used for this. ``iterator.decode_row(row)``
472+
decodes all the columns in the given row to an array of Python
473+
objects. ``iterator.decode_column(row, column_index)`` decodes one
474+
specific column in the given row.
475+
441476
:raises ValueError:
442477
for reuse of single-use snapshots, or if a transaction ID is
443478
already pending for multiple-use snapshots.
@@ -517,6 +552,7 @@ def execute_sql(
517552
trace_attributes,
518553
column_info,
519554
observability_options,
555+
lazy_decode=lazy_decode,
520556
)
521557
else:
522558
return self._get_streamed_result_set(
@@ -525,6 +561,7 @@ def execute_sql(
525561
trace_attributes,
526562
column_info,
527563
observability_options,
564+
lazy_decode=lazy_decode,
528565
)
529566

530567
def _get_streamed_result_set(
@@ -534,6 +571,7 @@ def _get_streamed_result_set(
534571
trace_attributes,
535572
column_info,
536573
observability_options=None,
574+
lazy_decode=False,
537575
):
538576
iterator = _restart_on_unavailable(
539577
restart,
@@ -548,9 +586,13 @@ def _get_streamed_result_set(
548586
self._execute_sql_count += 1
549587

550588
if self._multi_use:
551-
return StreamedResultSet(iterator, source=self, column_info=column_info)
589+
return StreamedResultSet(
590+
iterator, source=self, column_info=column_info, lazy_decode=lazy_decode
591+
)
552592
else:
553-
return StreamedResultSet(iterator, column_info=column_info)
593+
return StreamedResultSet(
594+
iterator, column_info=column_info, lazy_decode=lazy_decode
595+
)
554596

555597
def partition_read(
556598
self,

0 commit comments

Comments
 (0)