32
32
import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
33
33
import com .google .cloud .spanner .v1 .stub .SpannerStubSettings ;
34
34
import com .google .common .annotations .VisibleForTesting ;
35
+ import com .google .common .base .Preconditions ;
35
36
import com .google .common .collect .AbstractIterator ;
36
37
import com .google .common .collect .ImmutableMap ;
37
38
import com .google .common .collect .Lists ;
38
39
import com .google .common .util .concurrent .Uninterruptibles ;
39
40
import com .google .protobuf .ByteString ;
40
41
import com .google .protobuf .ListValue ;
42
+ import com .google .protobuf .NullValue ;
41
43
import com .google .protobuf .Value .KindCase ;
42
44
import com .google .spanner .v1 .PartialResultSet ;
43
45
import com .google .spanner .v1 .ResultSetMetadata ;
55
57
import java .math .BigDecimal ;
56
58
import java .util .AbstractList ;
57
59
import java .util .ArrayList ;
60
+ import java .util .Base64 ;
58
61
import java .util .BitSet ;
59
62
import java .util .Collections ;
60
63
import java .util .Iterator ;
61
64
import java .util .LinkedList ;
62
65
import java .util .List ;
66
+ import java .util .Objects ;
63
67
import java .util .concurrent .BlockingQueue ;
64
68
import java .util .concurrent .CountDownLatch ;
65
69
import java .util .concurrent .Executor ;
66
70
import java .util .concurrent .LinkedBlockingQueue ;
67
71
import java .util .concurrent .TimeUnit ;
68
72
import java .util .logging .Level ;
69
73
import java .util .logging .Logger ;
74
+ import java .util .stream .Collectors ;
75
+ import javax .annotation .Nonnull ;
70
76
import javax .annotation .Nullable ;
71
77
72
78
/** Implementation of {@link ResultSet}. */
73
79
abstract class AbstractResultSet <R > extends AbstractStructReader implements ResultSet {
74
80
private static final Tracer tracer = Tracing .getTracer ();
81
+ private static final com .google .protobuf .Value NULL_VALUE =
82
+ com .google .protobuf .Value .newBuilder ().setNullValue (NullValue .NULL_VALUE ).build ();
75
83
76
84
interface Listener {
77
85
/**
@@ -353,6 +361,79 @@ private boolean isMergeable(KindCase kind) {
353
361
}
354
362
}
355
363
364
+ static final class LazyByteArray implements Serializable {
365
+ private static final Base64 .Encoder ENCODER = Base64 .getEncoder ();
366
+ private static final Base64 .Decoder DECODER = Base64 .getDecoder ();
367
+ private final String base64String ;
368
+ private transient AbstractLazyInitializer <ByteArray > byteArray ;
369
+
370
+ LazyByteArray (@ Nonnull String base64String ) {
371
+ this .base64String = Preconditions .checkNotNull (base64String );
372
+ this .byteArray = defaultInitializer ();
373
+ }
374
+
375
+ LazyByteArray (@ Nonnull ByteArray byteArray ) {
376
+ this .base64String =
377
+ ENCODER .encodeToString (Preconditions .checkNotNull (byteArray ).toByteArray ());
378
+ this .byteArray =
379
+ new AbstractLazyInitializer <ByteArray >() {
380
+ @ Override
381
+ protected ByteArray initialize () {
382
+ return byteArray ;
383
+ }
384
+ };
385
+ }
386
+
387
+ private AbstractLazyInitializer <ByteArray > defaultInitializer () {
388
+ return new AbstractLazyInitializer <ByteArray >() {
389
+ @ Override
390
+ protected ByteArray initialize () {
391
+ return ByteArray .copyFrom (DECODER .decode (base64String ));
392
+ }
393
+ };
394
+ }
395
+
396
+ private void readObject (java .io .ObjectInputStream in )
397
+ throws IOException , ClassNotFoundException {
398
+ in .defaultReadObject ();
399
+ byteArray = defaultInitializer ();
400
+ }
401
+
402
+ ByteArray getByteArray () {
403
+ try {
404
+ return byteArray .get ();
405
+ } catch (Throwable t ) {
406
+ throw SpannerExceptionFactory .asSpannerException (t );
407
+ }
408
+ }
409
+
410
+ String getBase64String () {
411
+ return base64String ;
412
+ }
413
+
414
+ @ Override
415
+ public String toString () {
416
+ return getBase64String ();
417
+ }
418
+
419
+ @ Override
420
+ public int hashCode () {
421
+ return base64String .hashCode ();
422
+ }
423
+
424
+ @ Override
425
+ public boolean equals (Object o ) {
426
+ if (o instanceof LazyByteArray ) {
427
+ return lazyByteArraysEqual ((LazyByteArray ) o );
428
+ }
429
+ return false ;
430
+ }
431
+
432
+ private boolean lazyByteArraysEqual (LazyByteArray other ) {
433
+ return Objects .equals (getBase64String (), other .getBase64String ());
434
+ }
435
+ }
436
+
356
437
static class GrpcStruct extends Struct implements Serializable {
357
438
private final Type type ;
358
439
private final List <Object > rowData ;
@@ -395,7 +476,11 @@ private Object writeReplace() {
395
476
builder .set (fieldName ).to (Value .pgJsonb ((String ) value ));
396
477
break ;
397
478
case BYTES :
398
- builder .set (fieldName ).to ((ByteArray ) value );
479
+ builder
480
+ .set (fieldName )
481
+ .to (
482
+ Value .bytesFromBase64 (
483
+ value == null ? null : ((LazyByteArray ) value ).getBase64String ()));
399
484
break ;
400
485
case TIMESTAMP :
401
486
builder .set (fieldName ).to ((Timestamp ) value );
@@ -431,7 +516,17 @@ private Object writeReplace() {
431
516
builder .set (fieldName ).toPgJsonbArray ((Iterable <String >) value );
432
517
break ;
433
518
case BYTES :
434
- builder .set (fieldName ).toBytesArray ((Iterable <ByteArray >) value );
519
+ builder
520
+ .set (fieldName )
521
+ .toBytesArrayFromBase64 (
522
+ value == null
523
+ ? null
524
+ : ((List <LazyByteArray >) value )
525
+ .stream ()
526
+ .map (
527
+ element ->
528
+ element == null ? null : element .getBase64String ())
529
+ .collect (Collectors .toList ()));
435
530
break ;
436
531
case TIMESTAMP :
437
532
builder .set (fieldName ).toTimestampArray ((Iterable <Timestamp >) value );
@@ -511,7 +606,7 @@ private static Object decodeValue(Type fieldType, com.google.protobuf.Value prot
511
606
return proto .getStringValue ();
512
607
case BYTES :
513
608
checkType (fieldType , proto , KindCase .STRING_VALUE );
514
- return ByteArray . fromBase64 (proto .getStringValue ());
609
+ return new LazyByteArray (proto .getStringValue ());
515
610
case TIMESTAMP :
516
611
checkType (fieldType , proto , KindCase .STRING_VALUE );
517
612
return Timestamp .parseTimestamp (proto .getStringValue ());
@@ -526,6 +621,8 @@ private static Object decodeValue(Type fieldType, com.google.protobuf.Value prot
526
621
checkType (fieldType , proto , KindCase .LIST_VALUE );
527
622
ListValue structValue = proto .getListValue ();
528
623
return decodeStructValue (fieldType , structValue );
624
+ case UNRECOGNIZED :
625
+ return proto ;
529
626
default :
530
627
throw new AssertionError ("Unhandled type code: " + fieldType .getCode ());
531
628
}
@@ -634,7 +731,11 @@ protected String getPgJsonbInternal(int columnIndex) {
634
731
635
732
@ Override
636
733
protected ByteArray getBytesInternal (int columnIndex ) {
637
- return (ByteArray ) rowData .get (columnIndex );
734
+ return getLazyBytesInternal (columnIndex ).getByteArray ();
735
+ }
736
+
737
+ LazyByteArray getLazyBytesInternal (int columnIndex ) {
738
+ return (LazyByteArray ) rowData .get (columnIndex );
638
739
}
639
740
640
741
@ Override
@@ -647,6 +748,10 @@ protected Date getDateInternal(int columnIndex) {
647
748
return (Date ) rowData .get (columnIndex );
648
749
}
649
750
751
+ protected com .google .protobuf .Value getProtoValueInternal (int columnIndex ) {
752
+ return (com .google .protobuf .Value ) rowData .get (columnIndex );
753
+ }
754
+
650
755
@ Override
651
756
protected Value getValueInternal (int columnIndex ) {
652
757
final List <Type .StructField > structFields = getType ().getStructFields ();
@@ -671,13 +776,16 @@ protected Value getValueInternal(int columnIndex) {
671
776
case PG_JSONB :
672
777
return Value .pgJsonb (isNull ? null : getPgJsonbInternal (columnIndex ));
673
778
case BYTES :
674
- return Value .bytes (isNull ? null : getBytesInternal (columnIndex ));
779
+ return Value .internalBytes (isNull ? null : getLazyBytesInternal (columnIndex ));
675
780
case TIMESTAMP :
676
781
return Value .timestamp (isNull ? null : getTimestampInternal (columnIndex ));
677
782
case DATE :
678
783
return Value .date (isNull ? null : getDateInternal (columnIndex ));
679
784
case STRUCT :
680
785
return Value .struct (isNull ? null : getStructInternal (columnIndex ));
786
+ case UNRECOGNIZED :
787
+ return Value .unrecognized (
788
+ isNull ? NULL_VALUE : getProtoValueInternal (columnIndex ), columnType );
681
789
case ARRAY :
682
790
final Type elementType = columnType .getArrayElementType ();
683
791
switch (elementType .getCode ()) {
@@ -785,9 +893,10 @@ protected List<String> getPgJsonbListInternal(int columnIndex) {
785
893
}
786
894
787
895
@ Override
788
- @ SuppressWarnings ("unchecked" ) // We know ARRAY<BYTES> produces a List<ByteArray >.
896
+ @ SuppressWarnings ("unchecked" ) // We know ARRAY<BYTES> produces a List<LazyByteArray >.
789
897
protected List <ByteArray > getBytesListInternal (int columnIndex ) {
790
- return Collections .unmodifiableList ((List <ByteArray >) rowData .get (columnIndex ));
898
+ return Lists .transform (
899
+ (List <LazyByteArray >) rowData .get (columnIndex ), l -> l == null ? null : l .getByteArray ());
791
900
}
792
901
793
902
@ Override
0 commit comments