Skip to content

Commit 9b77f42

Browse files
committed
Merge pull request #3707 from akarnokd/CompletableLambdaIsUnsubscribed1x
1.x: make Completable.subscribe() report isUnsubscribed consistently
2 parents ee9956a + 00433f3 commit 9b77f42

File tree

2 files changed

+145
-1
lines changed

2 files changed

+145
-1
lines changed

src/main/java/rx/Completable.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1828,12 +1828,13 @@ public final Subscription subscribe() {
18281828
subscribe(new CompletableSubscriber() {
18291829
@Override
18301830
public void onCompleted() {
1831-
// nothing to do
1831+
mad.unsubscribe();
18321832
}
18331833

18341834
@Override
18351835
public void onError(Throwable e) {
18361836
ERROR_HANDLER.handleError(e);
1837+
mad.unsubscribe();
18371838
}
18381839

18391840
@Override
@@ -1864,11 +1865,13 @@ public void onCompleted() {
18641865
} catch (Throwable e) {
18651866
ERROR_HANDLER.handleError(e);
18661867
}
1868+
mad.unsubscribe();
18671869
}
18681870

18691871
@Override
18701872
public void onError(Throwable e) {
18711873
ERROR_HANDLER.handleError(e);
1874+
mad.unsubscribe();
18721875
}
18731876

18741877
@Override
@@ -1900,7 +1903,9 @@ public void onCompleted() {
19001903
onComplete.call();
19011904
} catch (Throwable e) {
19021905
onError(e);
1906+
return;
19031907
}
1908+
mad.unsubscribe();
19041909
}
19051910

19061911
@Override
@@ -1911,6 +1916,7 @@ public void onError(Throwable e) {
19111916
e = new CompositeException(Arrays.asList(e, ex));
19121917
ERROR_HANDLER.handleError(e);
19131918
}
1919+
mad.unsubscribe();
19141920
}
19151921

19161922
@Override

src/test/java/rx/CompletableTest.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3604,4 +3604,142 @@ public Completable call(Integer t) {
36043604
assertTrue(listEx.get(1).toString(), listEx.get(1) instanceof TestException);
36053605
}
36063606

3607+
@Test
3608+
public void subscribeReportsUnsubscribed() {
3609+
PublishSubject<String> stringSubject = PublishSubject.create();
3610+
Completable completable = stringSubject.toCompletable();
3611+
3612+
Subscription completableSubscription = completable.subscribe();
3613+
3614+
stringSubject.onCompleted();
3615+
3616+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3617+
}
3618+
3619+
@Test
3620+
public void subscribeReportsUnsubscribedOnError() {
3621+
PublishSubject<String> stringSubject = PublishSubject.create();
3622+
Completable completable = stringSubject.toCompletable();
3623+
3624+
Subscription completableSubscription = completable.subscribe();
3625+
3626+
stringSubject.onError(new TestException());
3627+
3628+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3629+
}
3630+
3631+
@Test
3632+
public void subscribeActionReportsUnsubscribed() {
3633+
PublishSubject<String> stringSubject = PublishSubject.create();
3634+
Completable completable = stringSubject.toCompletable();
3635+
3636+
Subscription completableSubscription = completable.subscribe(Actions.empty());
3637+
3638+
stringSubject.onCompleted();
3639+
3640+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3641+
}
3642+
3643+
@Test
3644+
public void subscribeActionReportsUnsubscribedAfter() {
3645+
PublishSubject<String> stringSubject = PublishSubject.create();
3646+
Completable completable = stringSubject.toCompletable();
3647+
3648+
final AtomicReference<Subscription> subscriptionRef = new AtomicReference<Subscription>();
3649+
Subscription completableSubscription = completable.subscribe(new Action0() {
3650+
@Override
3651+
public void call() {
3652+
if (subscriptionRef.get().isUnsubscribed()) {
3653+
subscriptionRef.set(null);
3654+
}
3655+
}
3656+
});
3657+
subscriptionRef.set(completableSubscription);
3658+
3659+
stringSubject.onCompleted();
3660+
3661+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3662+
assertNotNull("Unsubscribed before the call to onCompleted", subscriptionRef.get());
3663+
}
3664+
3665+
@Test
3666+
public void subscribeActionReportsUnsubscribedOnError() {
3667+
PublishSubject<String> stringSubject = PublishSubject.create();
3668+
Completable completable = stringSubject.toCompletable();
3669+
3670+
Subscription completableSubscription = completable.subscribe(Actions.empty());
3671+
3672+
stringSubject.onError(new TestException());
3673+
3674+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3675+
}
3676+
3677+
@Test
3678+
public void subscribeAction2ReportsUnsubscribed() {
3679+
PublishSubject<String> stringSubject = PublishSubject.create();
3680+
Completable completable = stringSubject.toCompletable();
3681+
3682+
Subscription completableSubscription = completable.subscribe(Actions.empty(), Actions.empty());
3683+
3684+
stringSubject.onCompleted();
3685+
3686+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3687+
}
3688+
3689+
@Test
3690+
public void subscribeAction2ReportsUnsubscribedOnError() {
3691+
PublishSubject<String> stringSubject = PublishSubject.create();
3692+
Completable completable = stringSubject.toCompletable();
3693+
3694+
Subscription completableSubscription = completable.subscribe(Actions.empty(), Actions.empty());
3695+
3696+
stringSubject.onError(new TestException());
3697+
3698+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3699+
}
3700+
3701+
@Test
3702+
public void subscribeAction2ReportsUnsubscribedAfter() {
3703+
PublishSubject<String> stringSubject = PublishSubject.create();
3704+
Completable completable = stringSubject.toCompletable();
3705+
3706+
final AtomicReference<Subscription> subscriptionRef = new AtomicReference<Subscription>();
3707+
Subscription completableSubscription = completable.subscribe(Actions.empty(), new Action0() {
3708+
@Override
3709+
public void call() {
3710+
if (subscriptionRef.get().isUnsubscribed()) {
3711+
subscriptionRef.set(null);
3712+
}
3713+
}
3714+
});
3715+
subscriptionRef.set(completableSubscription);
3716+
3717+
stringSubject.onCompleted();
3718+
3719+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3720+
assertNotNull("Unsubscribed before the call to onCompleted", subscriptionRef.get());
3721+
}
3722+
3723+
@Test
3724+
public void subscribeAction2ReportsUnsubscribedOnErrorAfter() {
3725+
PublishSubject<String> stringSubject = PublishSubject.create();
3726+
Completable completable = stringSubject.toCompletable();
3727+
3728+
final AtomicReference<Subscription> subscriptionRef = new AtomicReference<Subscription>();
3729+
Subscription completableSubscription = completable.subscribe(new Action1<Throwable>() {
3730+
@Override
3731+
public void call(Throwable e) {
3732+
if (subscriptionRef.get().isUnsubscribed()) {
3733+
subscriptionRef.set(null);
3734+
}
3735+
}
3736+
}, Actions.empty());
3737+
subscriptionRef.set(completableSubscription);
3738+
3739+
stringSubject.onError(new TestException());
3740+
3741+
assertTrue("Not unsubscribed?", completableSubscription.isUnsubscribed());
3742+
assertNotNull("Unsubscribed before the call to onError", subscriptionRef.get());
3743+
}
3744+
36073745
}

0 commit comments

Comments
 (0)