Skip to content

Commit 71996e7

Browse files
author
Roman Mazur
committed
Unsubscribe when thread is interrupted
1 parent 36a3bbd commit 71996e7

File tree

5 files changed

+150
-6
lines changed

5 files changed

+150
-6
lines changed

src/main/java/rx/internal/operators/BlockingOperatorLatest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Notification;
2424
import rx.Observable;
2525
import rx.Subscriber;
26+
import rx.Subscription;
2627
import rx.exceptions.Exceptions;
2728

2829
/**
@@ -49,7 +50,7 @@ public static <T> Iterable<T> latest(final Observable<? extends T> source) {
4950
@Override
5051
public Iterator<T> iterator() {
5152
LatestObserverIterator<T> lio = new LatestObserverIterator<T>();
52-
source.materialize().subscribe(lio);
53+
lio.subscription = source.materialize().subscribe(lio);
5354
return lio;
5455
}
5556
};
@@ -60,6 +61,7 @@ static final class LatestObserverIterator<T> extends Subscriber<Notification<? e
6061
final Semaphore notify = new Semaphore(0);
6162
// observer's notification
6263
volatile Notification<? extends T> value;
64+
volatile Subscription subscription;
6365
/** Updater for the value field. */
6466
@SuppressWarnings("rawtypes")
6567
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
@@ -96,6 +98,7 @@ public boolean hasNext() {
9698
try {
9799
notify.acquire();
98100
} catch (InterruptedException ex) {
101+
subscription.unsubscribe();
99102
Thread.currentThread().interrupt();
100103
iNotif = Notification.createOnError(ex);
101104
throw Exceptions.propagate(ex);

src/main/java/rx/internal/operators/BlockingOperatorNext.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Notification;
2525
import rx.Observable;
2626
import rx.Subscriber;
27+
import rx.Subscription;
2728
import rx.exceptions.Exceptions;
2829

2930
/**
@@ -65,6 +66,7 @@ public Iterator<T> iterator() {
6566
private boolean isNextConsumed = true;
6667
private Throwable error = null;
6768
private boolean started = false;
69+
private Subscription subscription;
6870

6971
private NextIterator(Observable<? extends T> items, NextObserver<T> observer) {
7072
this.items = items;
@@ -96,7 +98,7 @@ private boolean moveToNext() {
9698
started = true;
9799
// if not started, start now
98100
observer.setWaiting(1);
99-
items.materialize().subscribe(observer);
101+
subscription = items.materialize().subscribe(observer);
100102
}
101103

102104
Notification<? extends T> nextNotification = observer.takeNext();
@@ -117,6 +119,8 @@ private boolean moveToNext() {
117119
}
118120
throw new IllegalStateException("Should not reach here");
119121
} catch (InterruptedException e) {
122+
// Subscription is always created before the first observer.takeNext().
123+
subscription.unsubscribe();
120124
Thread.currentThread().interrupt();
121125
error = e;
122126
throw Exceptions.propagate(error);

src/main/java/rx/internal/operators/BlockingOperatorToIterator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.Notification;
2424
import rx.Observable;
2525
import rx.Subscriber;
26+
import rx.Subscription;
2627
import rx.exceptions.Exceptions;
2728

2829
/**
@@ -49,7 +50,7 @@ public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
4950
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();
5051

5152
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
52-
source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
53+
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
5354
@Override
5455
public void onCompleted() {
5556
// ignore
@@ -94,6 +95,7 @@ private Notification<? extends T> take() {
9495
try {
9596
return notifications.take();
9697
} catch (InterruptedException e) {
98+
subscription.unsubscribe();
9799
throw Exceptions.propagate(e);
98100
}
99101
}

src/main/java/rx/observables/BlockingObservable.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import rx.Observable;
2525
import rx.Subscriber;
26+
import rx.Subscription;
2627
import rx.functions.Action1;
2728
import rx.functions.Func1;
2829
import rx.functions.Functions;
@@ -95,7 +96,7 @@ public void forEach(final Action1<? super T> onNext) {
9596
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
9697
* as this is the final subscribe in the chain.
9798
*/
98-
o.subscribe(new Subscriber<T>() {
99+
Subscription subscription = o.subscribe(new Subscriber<T>() {
99100
@Override
100101
public void onCompleted() {
101102
latch.countDown();
@@ -124,6 +125,7 @@ public void onNext(T args) {
124125
try {
125126
latch.await();
126127
} catch (InterruptedException e) {
128+
subscription.unsubscribe();
127129
// set the interrupted flag again so callers can still get it
128130
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
129131
Thread.currentThread().interrupt();
@@ -450,7 +452,7 @@ private T blockForSingle(final Observable<? extends T> observable) {
450452
final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
451453
final CountDownLatch latch = new CountDownLatch(1);
452454

453-
observable.subscribe(new Subscriber<T>() {
455+
Subscription subscription = observable.subscribe(new Subscriber<T>() {
454456
@Override
455457
public void onCompleted() {
456458
latch.countDown();
@@ -471,6 +473,7 @@ public void onNext(final T item) {
471473
try {
472474
latch.await();
473475
} catch (InterruptedException e) {
476+
subscription.unsubscribe();
474477
Thread.currentThread().interrupt();
475478
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
476479
}

src/test/java/rx/observables/BlockingObservableTest.java

Lines changed: 133 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ public void testFirstOrDefault() {
357357

358358
@Test
359359
public void testFirstOrDefaultWithEmpty() {
360-
BlockingObservable<String> observable = BlockingObservable.from(Observable.<String> empty());
360+
BlockingObservable<String> observable = BlockingObservable.from(Observable.<String>empty());
361361
assertEquals("default", observable.firstOrDefault("default"));
362362
}
363363

@@ -410,4 +410,136 @@ public void call() {
410410
}
411411
assertTrue("Timeout means `unsubscribe` is not called", unsubscribe.await(30, TimeUnit.SECONDS));
412412
}
413+
414+
@Test
415+
public void testUnsubscribeFromSingleWhenInterrupted() throws InterruptedException {
416+
new InterruptionTests().assertUnsubscribeIsInvoked("single()", new Action1<BlockingObservable<Void>>() {
417+
@Override
418+
public void call(final BlockingObservable<Void> o) {
419+
o.single();
420+
}
421+
});
422+
}
423+
424+
@Test
425+
public void testUnsubscribeFromForEachWhenInterrupted() throws InterruptedException {
426+
new InterruptionTests().assertUnsubscribeIsInvoked("forEach()", new Action1<BlockingObservable<Void>>() {
427+
@Override
428+
public void call(final BlockingObservable<Void> o) {
429+
o.forEach(new Action1<Void>() {
430+
@Override
431+
public void call(final Void aVoid) {
432+
// nothing
433+
}
434+
});
435+
}
436+
});
437+
}
438+
439+
@Test
440+
public void testUnsubscribeFromFirstWhenInterrupted() throws InterruptedException {
441+
new InterruptionTests().assertUnsubscribeIsInvoked("first()", new Action1<BlockingObservable<Void>>() {
442+
@Override
443+
public void call(final BlockingObservable<Void> o) {
444+
o.first();
445+
}
446+
});
447+
}
448+
449+
@Test
450+
public void testUnsubscribeFromLastWhenInterrupted() throws InterruptedException {
451+
new InterruptionTests().assertUnsubscribeIsInvoked("last()", new Action1<BlockingObservable<Void>>() {
452+
@Override
453+
public void call(final BlockingObservable<Void> o) {
454+
o.last();
455+
}
456+
});
457+
}
458+
459+
@Test
460+
public void testUnsubscribeFromLatestWhenInterrupted() throws InterruptedException {
461+
new InterruptionTests().assertUnsubscribeIsInvoked("latest()", new Action1<BlockingObservable<Void>>() {
462+
@Override
463+
public void call(final BlockingObservable<Void> o) {
464+
o.latest().iterator().next();
465+
}
466+
});
467+
}
468+
469+
@Test
470+
public void testUnsubscribeFromNextWhenInterrupted() throws InterruptedException {
471+
new InterruptionTests().assertUnsubscribeIsInvoked("next()", new Action1<BlockingObservable<Void>>() {
472+
@Override
473+
public void call(final BlockingObservable<Void> o) {
474+
o.next().iterator().next();
475+
}
476+
});
477+
}
478+
479+
@Test
480+
public void testUnsubscribeFromGetIteratorWhenInterrupted() throws InterruptedException {
481+
new InterruptionTests().assertUnsubscribeIsInvoked("getIterator()", new Action1<BlockingObservable<Void>>() {
482+
@Override
483+
public void call(final BlockingObservable<Void> o) {
484+
o.getIterator().next();
485+
}
486+
});
487+
}
488+
489+
@Test
490+
public void testUnsubscribeFromToIterableWhenInterrupted() throws InterruptedException {
491+
new InterruptionTests().assertUnsubscribeIsInvoked("toIterable()", new Action1<BlockingObservable<Void>>() {
492+
@Override
493+
public void call(final BlockingObservable<Void> o) {
494+
o.toIterable().iterator().next();
495+
}
496+
});
497+
}
498+
499+
/** Utilities set for interruption behaviour tests. */
500+
private static class InterruptionTests {
501+
502+
private boolean isUnSubscribed;
503+
private RuntimeException error;
504+
private CountDownLatch latch = new CountDownLatch(1);
505+
506+
private Observable<Void> createObservable() {
507+
return Observable.<Void>never().doOnUnsubscribe(new Action0() {
508+
@Override
509+
public void call() {
510+
isUnSubscribed = true;
511+
}
512+
});
513+
}
514+
515+
private void startBlockingAndInterrupt(final Action1<BlockingObservable<Void>> blockingAction) {
516+
Thread subscriptionThread = new Thread() {
517+
@Override
518+
public void run() {
519+
try {
520+
blockingAction.call(createObservable().toBlocking());
521+
} catch (RuntimeException e) {
522+
if (!(e.getCause() instanceof InterruptedException)) {
523+
error = e;
524+
}
525+
}
526+
latch.countDown();
527+
}
528+
};
529+
subscriptionThread.start();
530+
subscriptionThread.interrupt();
531+
}
532+
533+
void assertUnsubscribeIsInvoked(final String method, final Action1<BlockingObservable<Void>> blockingAction)
534+
throws InterruptedException {
535+
startBlockingAndInterrupt(blockingAction);
536+
assertTrue("Timeout means interruption is not performed", latch.await(30, TimeUnit.SECONDS));
537+
if (error != null) {
538+
throw error;
539+
}
540+
assertTrue("'unsubscribe' is not invoked when thread is interrupted for " + method, isUnSubscribed);
541+
}
542+
543+
}
544+
413545
}

0 commit comments

Comments
 (0)