16
16
package software .amazon .awssdk .utils .cache ;
17
17
18
18
import static java .time .Instant .now ;
19
+ import static java .util .Collections .emptyList ;
19
20
import static org .assertj .core .api .Assertions .assertThat ;
20
21
import static org .junit .jupiter .api .Assertions .assertTrue ;
21
22
import static org .junit .jupiter .api .Assertions .fail ;
@@ -304,13 +305,12 @@ public void nonBlockingPrefetchStrategyDoesNotRefreshUntilItIsCalled() throws In
304
305
305
306
@ Test
306
307
public void threadsAreSharedBetweenNonBlockingInstances () throws InterruptedException {
307
- List <CachedSupplier <String >> css = new ArrayList <>();
308
- try {
309
- // Create 99 concurrent non-blocking instances
308
+ int maxActive = runAndCountThreads (() -> {
309
+ List <CachedSupplier <?>> css = new ArrayList <>();
310
310
for (int i = 0 ; i < 99 ; i ++) {
311
- CachedSupplier <String > supplier =
311
+ CachedSupplier <? > supplier =
312
312
CachedSupplier .builder (() -> RefreshResult .builder ("foo" )
313
- .prefetchTime (now ().plusMillis (1 ))
313
+ .prefetchTime (now ().plusMillis (10 ))
314
314
.staleTime (future ())
315
315
.build ())
316
316
.prefetchStrategy (new NonBlocking ("test" ))
@@ -319,60 +319,78 @@ public void threadsAreSharedBetweenNonBlockingInstances() throws InterruptedExce
319
319
supplier .get ();
320
320
css .add (supplier );
321
321
}
322
+ return css ;
323
+ });
322
324
323
- int maxActive = 0 ;
324
- for (int i = 0 ; i < 1000 ; i ++) {
325
- maxActive = Math .max (maxActive , NonBlocking .executor ().getActiveCount ());
326
- Thread .sleep (1 );
327
- }
328
-
329
- // Make sure we used less-than 99 to do the refreshes.
330
- assertThat (maxActive ).isBetween (1 , 99 );
331
- } finally {
332
- css .forEach (CachedSupplier ::close );
333
- }
325
+ assertThat (maxActive ).isBetween (1 , 99 );
334
326
}
335
327
336
328
@ Test
337
329
public void activeThreadsHaveMaxCount () throws InterruptedException {
338
330
ExecutorService executor = Executors .newCachedThreadPool ();
339
- List <CachedSupplier <String >> css = new ArrayList <>();
340
331
try {
341
- // Create 99 concurrent non-blocking instances
342
- for (int i = 0 ; i < 1000 ; i ++) {
343
- CachedSupplier <String > supplier =
344
- CachedSupplier .builder (() -> {
345
- invokeSafely (() -> Thread .sleep (100 ));
346
- return RefreshResult .builder ("foo" )
347
- .prefetchTime (now ().plusMillis (1 ))
348
- .staleTime (now ().plusSeconds (60 ))
349
- .build ();
350
- }).prefetchStrategy (new NonBlocking ("test" ))
351
- .prefetchJitterEnabled (false )
352
- .build ();
353
- executor .submit (supplier ::get );
354
- css .add (supplier );
355
- }
356
-
357
- executor .shutdown ();
358
- assertThat (executor .awaitTermination (10 , TimeUnit .SECONDS )).isTrue ();
359
-
360
- int maxActive = 0 ;
361
- for (int i = 0 ; i < 1000 ; i ++) {
362
- maxActive = Math .max (maxActive , NonBlocking .executor ().getActiveCount ());
363
- Thread .sleep (1 );
364
- }
332
+ int maxActive = runAndCountThreads (() -> {
333
+ List <CachedSupplier <?>> css = new ArrayList <>();
334
+
335
+ // Create 1000 concurrent non-blocking instances
336
+ for (int i = 0 ; i < 1000 ; i ++) {
337
+ CachedSupplier <String > supplier =
338
+ CachedSupplier .builder (() -> {
339
+ invokeSafely (() -> Thread .sleep (100 ));
340
+ return RefreshResult .builder ("foo" )
341
+ .prefetchTime (now ().plusMillis (10 ))
342
+ .staleTime (now ().plusSeconds (60 ))
343
+ .build ();
344
+ }).prefetchStrategy (new NonBlocking ("test" ))
345
+ .prefetchJitterEnabled (false )
346
+ .build ();
347
+ executor .submit (supplier ::get );
348
+ css .add (supplier );
349
+ }
350
+
351
+ executor .shutdown ();
352
+ assertThat (executor .awaitTermination (10 , TimeUnit .SECONDS )).isTrue ();
353
+ return css ;
354
+ });
365
355
366
- // In a perfect world this would be capped to 100, but the mechanism we use to limit concurrent refreshes usually
367
- // means more than 100 can get created. 150 should be a reasonable limit to check for, because without the limiter
368
- // it would be ~1000.
369
356
assertThat (maxActive ).isBetween (2 , 150 );
370
357
} finally {
371
- css .forEach (CachedSupplier ::close );
372
358
executor .shutdownNow ();
373
359
}
374
360
}
375
361
362
+ /**
363
+ * Run the provided supplier, measure the non-blocking executor thread count, and return the result. If the result is 0,
364
+ * try again. This makes our stochastic tests ~100% reliable instead of ~99%.
365
+ */
366
+ private int runAndCountThreads (ThrowingSupplier suppliersConstructor ) throws InterruptedException {
367
+ for (int attempt = 0 ; attempt < 10 ; attempt ++) {
368
+ Collection <CachedSupplier <?>> suppliers = emptyList ();
369
+ try {
370
+ suppliers = suppliersConstructor .get ();
371
+
372
+ int maxActive = 0 ;
373
+ for (int j = 0 ; j < 1000 ; j ++) {
374
+ maxActive = Math .max (maxActive , NonBlocking .executor ().getActiveCount ());
375
+ Thread .sleep (1 );
376
+ }
377
+
378
+ if (maxActive != 0 ) {
379
+ return maxActive ;
380
+ }
381
+ } finally {
382
+ suppliers .forEach (CachedSupplier ::close );
383
+ }
384
+ }
385
+
386
+ throw new AssertionError ("Thread count never exceeded 0." );
387
+ }
388
+
389
+ @ FunctionalInterface
390
+ interface ThrowingSupplier {
391
+ Collection <CachedSupplier <?>> get () throws InterruptedException ;
392
+ }
393
+
376
394
/**
377
395
* Asynchronously perform a "get" on the provided supplier, returning the future that will be completed when the "get"
378
396
* finishes.
0 commit comments