@@ -410,4 +410,136 @@ public void call() {
410
410
}
411
411
assertTrue ("Timeout means `unsubscribe` is not called" , unsubscribe .await (30 , TimeUnit .SECONDS ));
412
412
}
413
+
414
+ @ Test
415
+ public void testUnsubscribeFromSingleWhenInterrupted () throws InterruptedException {
416
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("single()" , new Action1 <BlockingObservable <Void >>() {
417
+ @ Override
418
+ public void call (final BlockingObservable <Void > o ) {
419
+ o .single ();
420
+ }
421
+ });
422
+ }
423
+
424
+ @ Test
425
+ public void testUnsubscribeFromForEachWhenInterrupted () throws InterruptedException {
426
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("forEach()" , new Action1 <BlockingObservable <Void >>() {
427
+ @ Override
428
+ public void call (final BlockingObservable <Void > o ) {
429
+ o .forEach (new Action1 <Void >() {
430
+ @ Override
431
+ public void call (final Void aVoid ) {
432
+ // nothing
433
+ }
434
+ });
435
+ }
436
+ });
437
+ }
438
+
439
+ @ Test
440
+ public void testUnsubscribeFromFirstWhenInterrupted () throws InterruptedException {
441
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("first()" , new Action1 <BlockingObservable <Void >>() {
442
+ @ Override
443
+ public void call (final BlockingObservable <Void > o ) {
444
+ o .first ();
445
+ }
446
+ });
447
+ }
448
+
449
+ @ Test
450
+ public void testUnsubscribeFromLastWhenInterrupted () throws InterruptedException {
451
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("last()" , new Action1 <BlockingObservable <Void >>() {
452
+ @ Override
453
+ public void call (final BlockingObservable <Void > o ) {
454
+ o .last ();
455
+ }
456
+ });
457
+ }
458
+
459
+ @ Test
460
+ public void testUnsubscribeFromLatestWhenInterrupted () throws InterruptedException {
461
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("latest()" , new Action1 <BlockingObservable <Void >>() {
462
+ @ Override
463
+ public void call (final BlockingObservable <Void > o ) {
464
+ o .latest ().iterator ().next ();
465
+ }
466
+ });
467
+ }
468
+
469
+ @ Test
470
+ public void testUnsubscribeFromNextWhenInterrupted () throws InterruptedException {
471
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("next()" , new Action1 <BlockingObservable <Void >>() {
472
+ @ Override
473
+ public void call (final BlockingObservable <Void > o ) {
474
+ o .next ().iterator ().next ();
475
+ }
476
+ });
477
+ }
478
+
479
+ @ Test
480
+ public void testUnsubscribeFromGetIteratorWhenInterrupted () throws InterruptedException {
481
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("getIterator()" , new Action1 <BlockingObservable <Void >>() {
482
+ @ Override
483
+ public void call (final BlockingObservable <Void > o ) {
484
+ o .getIterator ().next ();
485
+ }
486
+ });
487
+ }
488
+
489
+ @ Test
490
+ public void testUnsubscribeFromToIterableWhenInterrupted () throws InterruptedException {
491
+ new InterruptionTests ().assertUnsubscribeIsInvoked ("toIterable()" , new Action1 <BlockingObservable <Void >>() {
492
+ @ Override
493
+ public void call (final BlockingObservable <Void > o ) {
494
+ o .toIterable ().iterator ().next ();
495
+ }
496
+ });
497
+ }
498
+
499
+ /** Utilities set for interruption behaviour tests. */
500
+ private static class InterruptionTests {
501
+
502
+ private boolean isUnSubscribed ;
503
+ private RuntimeException error ;
504
+ private CountDownLatch latch = new CountDownLatch (1 );
505
+
506
+ private Observable <Void > createObservable () {
507
+ return Observable .<Void >never ().doOnUnsubscribe (new Action0 () {
508
+ @ Override
509
+ public void call () {
510
+ isUnSubscribed = true ;
511
+ }
512
+ });
513
+ }
514
+
515
+ private void startBlockingAndInterrupt (final Action1 <BlockingObservable <Void >> blockingAction ) {
516
+ Thread subscriptionThread = new Thread () {
517
+ @ Override
518
+ public void run () {
519
+ try {
520
+ blockingAction .call (createObservable ().toBlocking ());
521
+ } catch (RuntimeException e ) {
522
+ if (!(e .getCause () instanceof InterruptedException )) {
523
+ error = e ;
524
+ }
525
+ }
526
+ latch .countDown ();
527
+ }
528
+ };
529
+ subscriptionThread .start ();
530
+ subscriptionThread .interrupt ();
531
+ }
532
+
533
+ void assertUnsubscribeIsInvoked (final String method , final Action1 <BlockingObservable <Void >> blockingAction )
534
+ throws InterruptedException {
535
+ startBlockingAndInterrupt (blockingAction );
536
+ assertTrue ("Timeout means interruption is not performed" , latch .await (30 , TimeUnit .SECONDS ));
537
+ if (error != null ) {
538
+ throw error ;
539
+ }
540
+ assertTrue ("'unsubscribe' is not invoked when thread is interrupted for " + method , isUnSubscribed );
541
+ }
542
+
543
+ }
544
+
413
545
}
0 commit comments