27
27
import com .google .api .gax .longrunning .OperationTimedPollAlgorithm ;
28
28
import com .google .api .gax .retrying .RetrySettings ;
29
29
import com .google .cloud .spanner .ErrorCode ;
30
+ import com .google .cloud .spanner .ForceCloseSpannerFunction ;
30
31
import com .google .cloud .spanner .MockSpannerServiceImpl .SimulatedExecutionTime ;
31
32
import com .google .cloud .spanner .ResultSet ;
33
+ import com .google .cloud .spanner .SessionPoolOptions ;
32
34
import com .google .cloud .spanner .SpannerException ;
33
35
import com .google .cloud .spanner .SpannerExceptionFactory ;
34
36
import com .google .cloud .spanner .Statement ;
35
37
import com .google .cloud .spanner .connection .AbstractConnectionImplTest .ConnectionConsumer ;
36
38
import com .google .cloud .spanner .connection .ITAbstractSpannerTest .ITConnection ;
39
+ import com .google .cloud .spanner .connection .SpannerPool .CheckAndCloseSpannersMode ;
37
40
import com .google .cloud .spanner .connection .StatementExecutor .StatementExecutorType ;
38
41
import com .google .common .base .Stopwatch ;
39
- import com .google .common .collect .Collections2 ;
40
42
import com .google .longrunning .Operation ;
41
43
import com .google .protobuf .AbstractMessage ;
42
44
import com .google .protobuf .Any ;
47
49
import com .google .spanner .v1 .ExecuteSqlRequest ;
48
50
import io .grpc .Status ;
49
51
import java .time .Duration ;
52
+ import java .util .ArrayList ;
53
+ import java .util .ConcurrentModificationException ;
54
+ import java .util .List ;
50
55
import java .util .concurrent .CountDownLatch ;
51
56
import java .util .concurrent .ExecutionException ;
52
57
import java .util .concurrent .ExecutorService ;
@@ -108,18 +113,23 @@ protected ITConnection createConnection() {
108
113
.setUri (getBaseUrl () + ";trackSessionLeaks=false" )
109
114
.setStatementExecutorType (statementExecutorType )
110
115
.setConfigurator (
111
- optionsConfigurator ->
112
- optionsConfigurator
113
- .getDatabaseAdminStubSettingsBuilder ()
114
- .updateDatabaseDdlOperationSettings ()
115
- .setPollingAlgorithm (
116
- OperationTimedPollAlgorithm .create (
117
- RetrySettings .newBuilder ()
118
- .setInitialRetryDelayDuration (Duration .ofMillis (1L ))
119
- .setMaxRetryDelayDuration (Duration .ofMillis (1L ))
120
- .setRetryDelayMultiplier (1.0 )
121
- .setTotalTimeoutDuration (Duration .ofMinutes (10L ))
122
- .build ())))
116
+ optionsConfigurator -> {
117
+ optionsConfigurator
118
+ .getDatabaseAdminStubSettingsBuilder ()
119
+ .updateDatabaseDdlOperationSettings ()
120
+ .setPollingAlgorithm (
121
+ OperationTimedPollAlgorithm .create (
122
+ RetrySettings .newBuilder ()
123
+ .setInitialRetryDelayDuration (Duration .ofMillis (1L ))
124
+ .setMaxRetryDelayDuration (Duration .ofMillis (1L ))
125
+ .setRetryDelayMultiplier (1.0 )
126
+ .setTotalTimeoutDuration (Duration .ofMinutes (10L ))
127
+ .build ()));
128
+ optionsConfigurator .setSessionPoolOption (
129
+ SessionPoolOptions .newBuilder ()
130
+ .setWaitForMinSessionsDuration (Duration .ofSeconds (5L ))
131
+ .build ());
132
+ })
123
133
.build ();
124
134
return createITConnection (options );
125
135
}
@@ -138,6 +148,8 @@ public void setup() {
138
148
@ After
139
149
public void clearExecutionTimes () {
140
150
mockSpanner .removeAllExecutionTimes ();
151
+ SpannerPool .INSTANCE .checkAndCloseSpanners (
152
+ CheckAndCloseSpannersMode .ERROR , new ForceCloseSpannerFunction (5L , TimeUnit .MILLISECONDS ));
141
153
}
142
154
143
155
@ Test
@@ -617,20 +629,20 @@ static void waitForRequestsToContain(Class<? extends AbstractMessage> request) {
617
629
private void waitForDdlRequestOnServer () {
618
630
try {
619
631
Stopwatch watch = Stopwatch .createStarted ();
620
- while (Collections2 .filter (
621
- mockDatabaseAdmin .getRequests (),
622
- input -> input .getClass ().equals (UpdateDatabaseDdlRequest .class ))
623
- .isEmpty ()) {
632
+ while (watch .elapsed (TimeUnit .MILLISECONDS ) < EXECUTION_TIME_SLOW_STATEMENT ) {
633
+ try {
634
+ List <AbstractMessage > requests = new ArrayList <>(mockDatabaseAdmin .getRequests ());
635
+ if (requests .stream ().anyMatch (request -> request instanceof UpdateDatabaseDdlRequest )) {
636
+ break ;
637
+ }
638
+ } catch (ConcurrentModificationException ignore ) {
639
+ // Just ignore and retry.
640
+ }
624
641
//noinspection BusyWait
625
642
Thread .sleep (1L );
626
- if (watch .elapsed (TimeUnit .MILLISECONDS ) > EXECUTION_TIME_SLOW_STATEMENT ) {
627
- throw new TimeoutException ("Timeout while waiting for DDL request" );
628
- }
629
643
}
630
644
} catch (InterruptedException e ) {
631
645
throw SpannerExceptionFactory .propagateInterrupt (e );
632
- } catch (TimeoutException e ) {
633
- throw SpannerExceptionFactory .propagateTimeout (e );
634
646
}
635
647
}
636
648
@@ -1010,6 +1022,7 @@ public void testCancelDdlBatch() {
1010
1022
} finally {
1011
1023
executor .shutdownNow ();
1012
1024
}
1025
+ connection .closeAsync ();
1013
1026
}
1014
1027
}
1015
1028
@@ -1036,6 +1049,7 @@ public void testCancelDdlAutocommit() {
1036
1049
} finally {
1037
1050
executor .shutdownNow ();
1038
1051
}
1052
+ connection .closeAsync ();
1039
1053
}
1040
1054
}
1041
1055
@@ -1049,6 +1063,8 @@ public void testTimeoutExceptionDdlAutocommit() {
1049
1063
SpannerException e =
1050
1064
assertThrows (SpannerException .class , () -> connection .execute (Statement .of (SLOW_DDL )));
1051
1065
assertEquals (ErrorCode .DEADLINE_EXCEEDED , e .getErrorCode ());
1066
+
1067
+ connection .closeAsync ();
1052
1068
}
1053
1069
}
1054
1070
0 commit comments