Skip to content

Commit 977534f

Browse files
fix(spanner): Skip gRPC trailers for StreamingRead & ExecuteStreamingSql
1 parent 7c714be commit 977534f

File tree

2 files changed

+62
-1
lines changed

2 files changed

+62
-1
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
5252
private TimeUnit streamWaitTimeoutUnit;
5353
private long streamWaitTimeoutValue;
5454
private SpannerException error;
55+
private volatile boolean done;
5556

5657
@VisibleForTesting
5758
GrpcStreamIterator(int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
@@ -166,11 +167,17 @@ private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer {
166167
@Override
167168
public void onPartialResultSet(PartialResultSet results) {
168169
addToStream(results);
170+
if (results.getLast()) {
171+
done = true;
172+
addToStream(END_OF_STREAM);
173+
}
169174
}
170175

171176
@Override
172177
public void onCompleted() {
173-
addToStream(END_OF_STREAM);
178+
if (!done) {
179+
addToStream(END_OF_STREAM);
180+
}
174181
}
175182

176183
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,4 +1115,58 @@ public void getProtoEnumList() {
11151115
resultSet.getProtoEnum(0, Genre::forNumber);
11161116
});
11171117
}
1118+
1119+
@Test
1120+
public void verifyResultSetWithLastTrue() {
1121+
long[] longArray = {111, 333, 444, 0, -1, -2234, Long.MAX_VALUE, Long.MIN_VALUE};
1122+
1123+
consumer.onPartialResultSet(
1124+
PartialResultSet.newBuilder()
1125+
.setMetadata(
1126+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1127+
.addValues(Value.int64Array(longArray).toProto())
1128+
.setLast(false)
1129+
.build());
1130+
assertThat(resultSet.next()).isTrue();
1131+
consumer.onPartialResultSet(
1132+
PartialResultSet.newBuilder()
1133+
.setMetadata(
1134+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1135+
.addValues(Value.int64Array(longArray).toProto())
1136+
.setLast(true)
1137+
.build());
1138+
assertThat(resultSet.next()).isTrue();
1139+
assertThat(resultSet.next()).isFalse();
1140+
consumer.onCompleted();
1141+
}
1142+
1143+
@Test
1144+
public void shouldThrowDeadlineExceededIfLastTrueIsNotReceived() {
1145+
long[] longArray = {111, 333, 444, 0, -1, -2234, Long.MAX_VALUE, Long.MIN_VALUE};
1146+
1147+
consumer.onPartialResultSet(
1148+
PartialResultSet.newBuilder()
1149+
.setMetadata(
1150+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1151+
.addValues(Value.int64Array(longArray).toProto())
1152+
.setLast(false)
1153+
.build());
1154+
assertThat(resultSet.next()).isTrue();
1155+
consumer.onPartialResultSet(
1156+
PartialResultSet.newBuilder()
1157+
.setMetadata(
1158+
makeMetadata(Type.struct(Type.StructField.of("f", Type.array(Type.int64())))))
1159+
.addValues(Value.int64Array(longArray).toProto())
1160+
.setLast(false)
1161+
.build());
1162+
assertThat(resultSet.next()).isTrue();
1163+
SpannerException spannerException =
1164+
assertThrows(
1165+
SpannerException.class,
1166+
() -> {
1167+
assertThat(resultSet.next()).isFalse();
1168+
});
1169+
assertEquals("DEADLINE_EXCEEDED: stream wait timeout", spannerException.getMessage());
1170+
consumer.onCompleted();
1171+
}
11181172
}

0 commit comments

Comments
 (0)