25
25
import com .google .api .client .util .BackOff ;
26
26
import com .google .api .client .util .ExponentialBackOff ;
27
27
import com .google .api .gax .retrying .RetrySettings ;
28
+ import com .google .api .gax .rpc .ApiCallContext ;
28
29
import com .google .cloud .ByteArray ;
29
30
import com .google .cloud .Date ;
30
31
import com .google .cloud .Timestamp ;
74
75
import java .util .stream .Collectors ;
75
76
import javax .annotation .Nonnull ;
76
77
import javax .annotation .Nullable ;
78
+ import org .threeten .bp .Duration ;
77
79
78
80
/** Implementation of {@link ResultSet}. */
79
81
abstract class AbstractResultSet <R > extends AbstractStructReader implements ResultSet {
@@ -944,6 +946,8 @@ static class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
944
946
945
947
private SpannerRpc .StreamingCall call ;
946
948
private volatile boolean withBeginTransaction ;
949
+ private TimeUnit streamWaitTimeoutUnit ;
950
+ private long streamWaitTimeoutValue ;
947
951
private SpannerException error ;
948
952
949
953
@ VisibleForTesting
@@ -965,6 +969,22 @@ protected final SpannerRpc.ResultStreamConsumer consumer() {
965
969
public void setCall (SpannerRpc .StreamingCall call , boolean withBeginTransaction ) {
966
970
this .call = call ;
967
971
this .withBeginTransaction = withBeginTransaction ;
972
+ ApiCallContext callContext = call .getCallContext ();
973
+ Duration streamWaitTimeout = callContext == null ? null : callContext .getStreamWaitTimeout ();
974
+ if (streamWaitTimeout != null ) {
975
+ // Determine the timeout unit to use. This reduces the precision to seconds if the timeout
976
+ // value is more than 1 second, which is lower than the precision that would normally be
977
+ // used by the stream watchdog (which uses a precision of 10 seconds by default).
978
+ if (streamWaitTimeout .getSeconds () > 0L ) {
979
+ streamWaitTimeoutValue = streamWaitTimeout .getSeconds ();
980
+ streamWaitTimeoutUnit = TimeUnit .SECONDS ;
981
+ } else if (streamWaitTimeout .getNano () > 0 ) {
982
+ streamWaitTimeoutValue = streamWaitTimeout .getNano ();
983
+ streamWaitTimeoutUnit = TimeUnit .NANOSECONDS ;
984
+ }
985
+ // Note that if the stream-wait-timeout is zero, we won't set a timeout at all.
986
+ // That is consistent with ApiCallContext#withStreamWaitTimeout(Duration.ZERO).
987
+ }
968
988
}
969
989
970
990
@ Override
@@ -983,11 +1003,15 @@ public boolean isWithBeginTransaction() {
983
1003
protected final PartialResultSet computeNext () {
984
1004
PartialResultSet next ;
985
1005
try {
986
- // TODO: Ideally honor io.grpc.Context while blocking here. In practice,
987
- // cancellation/deadline results in an error being delivered to "stream", which
988
- // should mean that we do not block significantly longer afterwards, but it would
989
- // be more robust to use poll() with a timeout.
990
- next = stream .take ();
1006
+ if (streamWaitTimeoutUnit != null ) {
1007
+ next = stream .poll (streamWaitTimeoutValue , streamWaitTimeoutUnit );
1008
+ if (next == null ) {
1009
+ throw SpannerExceptionFactory .newSpannerException (
1010
+ ErrorCode .DEADLINE_EXCEEDED , "stream wait timeout" );
1011
+ }
1012
+ } else {
1013
+ next = stream .take ();
1014
+ }
991
1015
} catch (InterruptedException e ) {
992
1016
// Treat interrupt as a request to cancel the read.
993
1017
throw SpannerExceptionFactory .propagateInterrupt (e );
0 commit comments