Skip to content

Commit 6e267fb

Browse files
fix: also shutdown the stream connection in case the timeout exception is (#2445)
triggered.
1 parent 43fd699 commit 6e267fb

File tree

2 files changed

+24
-5
lines changed

2 files changed

+24
-5
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,8 @@ public void run() {
353353
} finally {
354354
lock.unlock();
355355
}
356-
cleanupInflightRequests();
356+
cleanupConnectionAndRequests(
357+
/* avoidBlocking= */ true); // don't perform blocking operations while on user thread
357358
});
358359
this.appendThread.start();
359360
}
@@ -812,7 +813,10 @@ private void appendLoop() {
812813
this.streamConnection.send(originalRequestBuilder.build());
813814
}
814815
}
816+
cleanupConnectionAndRequests(/* avoidBlocking= */ false);
817+
}
815818

819+
private void cleanupConnectionAndRequests(boolean avoidBlocking) {
816820
log.info(
817821
"Cleanup starts. Stream: "
818822
+ streamName
@@ -828,7 +832,9 @@ private void appendLoop() {
828832
// We can close the stream connection and handle the remaining inflight requests.
829833
if (streamConnection != null) {
830834
this.streamConnection.close();
831-
waitForDoneCallback(3, TimeUnit.MINUTES);
835+
if (!avoidBlocking) {
836+
waitForDoneCallback(3, TimeUnit.MINUTES);
837+
}
832838
}
833839

834840
// At this point, there cannot be more callback. It is safe to clean up all inflight requests.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -650,14 +650,15 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
650650
null,
651651
client.getSettings(),
652652
retrySettings);
653-
testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));
653+
org.threeten.bp.Duration durationSleep = org.threeten.bp.Duration.ofSeconds(2);
654+
testBigQueryWrite.setResponseSleep(durationSleep);
654655

655-
long appendCount = 10;
656+
long appendCount = 2;
656657
for (int i = 0; i < appendCount; i++) {
657658
testBigQueryWrite.addResponse(createAppendResponse(i));
658659
}
659660

660-
// In total insert 5 requests,
661+
// In total insert 'appendCount' requests,
661662
List<ApiFuture<AppendRowsResponse>> futures = new ArrayList<>();
662663
for (int i = 0; i < appendCount; i++) {
663664
futures.add(
@@ -691,6 +692,18 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
691692
100)
692693
.get());
693694
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
695+
696+
// Verify we can shutdown normally within the expected time.
697+
long startCloseTime = System.currentTimeMillis();
698+
connectionWorker.close();
699+
long timeDiff = System.currentTimeMillis() - startCloseTime;
700+
assertTrue(
701+
"timeDiff: "
702+
+ timeDiff
703+
+ " is more than total durationSleep: "
704+
+ (appendCount * durationSleep.toMillis()),
705+
timeDiff <= (appendCount * durationSleep.toMillis()));
706+
assertTrue(connectionWorker.isUserClosed());
694707
}
695708

696709
@Test

0 commit comments

Comments
 (0)