23
23
import com .google .cloud .spanner .SpannerException ;
24
24
import com .google .cloud .spanner .SpannerExceptionFactory ;
25
25
import com .google .cloud .spanner .Struct ;
26
+ import com .google .cloud .spanner .Type ;
26
27
import com .google .common .base .Preconditions ;
27
28
import com .google .common .base .Supplier ;
28
29
import com .google .spanner .v1 .ResultSetMetadata ;
@@ -61,18 +62,26 @@ public void run() {
61
62
try (ResultSet resultSet = connection .runPartition (partitionId )) {
62
63
boolean first = true ;
63
64
while (resultSet .next ()) {
65
+ Struct row = resultSet .getCurrentRowAsStruct ();
64
66
if (first ) {
65
67
queue .put (
66
68
PartitionExecutorResult .dataAndMetadata (
67
- resultSet .getCurrentRowAsStruct (), resultSet .getMetadata ()));
69
+ row , resultSet .getType (), resultSet .getMetadata ()));
68
70
first = false ;
69
71
} else {
70
- queue .put (PartitionExecutorResult .data (resultSet . getCurrentRowAsStruct () ));
72
+ queue .put (PartitionExecutorResult .data (row ));
71
73
}
72
74
if (shouldStop .get ()) {
73
75
break ;
74
76
}
75
77
}
78
+ if (first ) {
79
+ // Special case: The result set did not return any rows. Push the metadata to the merged
80
+ // result set.
81
+ queue .put (
82
+ PartitionExecutorResult .typeAndMetadata (
83
+ resultSet .getType (), resultSet .getMetadata ()));
84
+ }
76
85
} catch (Throwable exception ) {
77
86
putWithoutInterruptPropagation (PartitionExecutorResult .exception (exception ));
78
87
} finally {
@@ -96,32 +105,47 @@ private void putWithoutInterruptPropagation(PartitionExecutorResult result) {
96
105
static class PartitionExecutorResult {
97
106
private final Struct data ;
98
107
private final Throwable exception ;
108
+ private final Type type ;
99
109
private final ResultSetMetadata metadata ;
100
110
101
111
static PartitionExecutorResult data (Struct data ) {
102
- return new PartitionExecutorResult (data , null , null );
112
+ return new PartitionExecutorResult (data , null , null , null );
113
+ }
114
+
115
+ static PartitionExecutorResult typeAndMetadata (Type type , ResultSetMetadata metadata ) {
116
+ return new PartitionExecutorResult (null , type , metadata , null );
103
117
}
104
118
105
- static PartitionExecutorResult dataAndMetadata (Struct data , ResultSetMetadata metadata ) {
106
- return new PartitionExecutorResult (data , metadata , null );
119
+ static PartitionExecutorResult dataAndMetadata (
120
+ Struct data , Type type , ResultSetMetadata metadata ) {
121
+ return new PartitionExecutorResult (data , type , metadata , null );
107
122
}
108
123
109
124
static PartitionExecutorResult exception (Throwable exception ) {
110
- return new PartitionExecutorResult (null , null , exception );
125
+ return new PartitionExecutorResult (null , null , null , exception );
111
126
}
112
127
113
128
static PartitionExecutorResult finished () {
114
- return new PartitionExecutorResult (null , null , null );
129
+ return new PartitionExecutorResult (null , null , null , null );
115
130
}
116
131
117
- private PartitionExecutorResult (Struct data , ResultSetMetadata metadata , Throwable exception ) {
132
+ private PartitionExecutorResult (
133
+ Struct data , Type type , ResultSetMetadata metadata , Throwable exception ) {
118
134
this .data = data ;
135
+ this .type = type ;
119
136
this .metadata = metadata ;
120
137
this .exception = exception ;
121
138
}
122
139
140
+ boolean hasData () {
141
+ return this .data != null ;
142
+ }
143
+
123
144
boolean isFinished () {
124
- return this .data == null && this .metadata == null && this .exception == null ;
145
+ return this .data == null
146
+ && this .type == null
147
+ && this .metadata == null
148
+ && this .exception == null ;
125
149
}
126
150
}
127
151
@@ -135,6 +159,7 @@ static class RowProducer implements Supplier<Struct> {
135
159
private final AtomicInteger finishedCounter ;
136
160
private final LinkedBlockingDeque <PartitionExecutorResult > queue ;
137
161
private ResultSetMetadata metadata ;
162
+ private Type type ;
138
163
private Struct currentRow ;
139
164
private Throwable exception ;
140
165
@@ -185,8 +210,7 @@ boolean nextRow() throws Throwable {
185
210
PartitionExecutorResult next ;
186
211
if ((next = queue .peek ()) != null && !next .isFinished ()) {
187
212
// There's a valid result available. Return this quickly.
188
- setNextRow (queue .remove ());
189
- return true ;
213
+ return setNextRow (queue .remove ());
190
214
}
191
215
// Block until the next row is available.
192
216
next = queue .take ();
@@ -196,13 +220,12 @@ boolean nextRow() throws Throwable {
196
220
return false ;
197
221
}
198
222
} else {
199
- setNextRow (next );
200
- return true ;
223
+ return setNextRow (next );
201
224
}
202
225
}
203
226
}
204
227
205
- void setNextRow (PartitionExecutorResult next ) throws Throwable {
228
+ boolean setNextRow (PartitionExecutorResult next ) throws Throwable {
206
229
if (next .exception != null ) {
207
230
this .exception = next .exception ;
208
231
throw next .exception ;
@@ -211,6 +234,10 @@ void setNextRow(PartitionExecutorResult next) throws Throwable {
211
234
if (this .metadata == null && next .metadata != null ) {
212
235
this .metadata = next .metadata ;
213
236
}
237
+ if (this .type == null && next .type != null ) {
238
+ this .type = next .type ;
239
+ }
240
+ return next .hasData ();
214
241
}
215
242
216
243
@ Override
@@ -223,6 +250,11 @@ public ResultSetMetadata getMetadata() {
223
250
checkState (metadata != null , "next() call required" );
224
251
return metadata ;
225
252
}
253
+
254
+ public Type getType () {
255
+ checkState (type != null , "next() call required" );
256
+ return type ;
257
+ }
226
258
}
227
259
228
260
private final RowProducer rowProducer ;
@@ -279,6 +311,12 @@ public ResultSetMetadata getMetadata() {
279
311
return rowProducer .getMetadata ();
280
312
}
281
313
314
+ @ Override
315
+ public Type getType () {
316
+ checkValidState ();
317
+ return rowProducer .getType ();
318
+ }
319
+
282
320
@ Override
283
321
public int getNumPartitions () {
284
322
return rowProducer .partitionExecutors .size ();
0 commit comments