Skip to content

Commit 6c58557

Browse files
authored
2.x: add Single.unsubscribeOn() (#5302)
* 2.x: add Single.unsubscribeOn() * Fix experimental marker location
1 parent db62772 commit 6c58557

File tree

3 files changed

+247
-1
lines changed

3 files changed

+247
-1
lines changed

src/main/java/io/reactivex/Single.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
import java.util.NoSuchElementException;
1717
import java.util.concurrent.*;
1818

19+
import org.reactivestreams.Publisher;
20+
1921
import io.reactivex.annotations.*;
2022
import io.reactivex.disposables.Disposable;
2123
import io.reactivex.exceptions.Exceptions;
@@ -32,7 +34,6 @@
3234
import io.reactivex.observers.TestObserver;
3335
import io.reactivex.plugins.RxJavaPlugins;
3436
import io.reactivex.schedulers.Schedulers;
35-
import org.reactivestreams.Publisher;
3637

3738
/**
3839
* The Single class implements the Reactive Pattern for a single value response.
@@ -3067,6 +3068,26 @@ public final Observable<T> toObservable() {
30673068
return RxJavaPlugins.onAssembly(new SingleToObservable<T>(this));
30683069
}
30693070

3071+
/**
3072+
* Returns a Single which makes sure when a SingleObserver disposes the Disposable,
3073+
* that call is propagated up on the specified scheduler
3074+
* <dl>
3075+
* <dt><b>Scheduler:</b></dt>
3076+
* <dd>{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.</dd>
3077+
* </dl>
3078+
* @param scheduler the target scheduler where to execute the cancellation
3079+
* @return the new Single instance
3080+
* @throws NullPointerException if scheduler is null
3081+
* @since 2.0.9 - experimental
3082+
*/
3083+
@CheckReturnValue
3084+
@SchedulerSupport(SchedulerSupport.CUSTOM)
3085+
@Experimental
3086+
public final Single<T> unsubscribeOn(final Scheduler scheduler) {
3087+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
3088+
return RxJavaPlugins.onAssembly(new SingleUnsubscribeOn<T>(this, scheduler));
3089+
}
3090+
30703091
/**
30713092
* Returns a Single that emits the result of applying a specified function to the pair of items emitted by
30723093
* the source Single and another specified Single.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import java.util.concurrent.atomic.AtomicReference;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
/**
23+
* Makes sure a dispose() call from downstream happens on the specified scheduler.
24+
*
25+
* @param <T> the value type
26+
*/
27+
public final class SingleUnsubscribeOn<T> extends Single<T> {
28+
29+
final SingleSource<T> source;
30+
31+
final Scheduler scheduler;
32+
33+
public SingleUnsubscribeOn(SingleSource<T> source, Scheduler scheduler) {
34+
this.source = source;
35+
this.scheduler = scheduler;
36+
}
37+
38+
@Override
39+
protected void subscribeActual(SingleObserver<? super T> observer) {
40+
source.subscribe(new UnsubscribeOnSingleObserver<T>(observer, scheduler));
41+
}
42+
43+
static final class UnsubscribeOnSingleObserver<T> extends AtomicReference<Disposable>
44+
implements SingleObserver<T>, Disposable, Runnable {
45+
46+
private static final long serialVersionUID = 3256698449646456986L;
47+
48+
final SingleObserver<? super T> actual;
49+
50+
final Scheduler scheduler;
51+
52+
Disposable ds;
53+
54+
UnsubscribeOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
55+
this.actual = actual;
56+
this.scheduler = scheduler;
57+
}
58+
59+
@Override
60+
public void dispose() {
61+
Disposable d = getAndSet(DisposableHelper.DISPOSED);
62+
if (d != DisposableHelper.DISPOSED) {
63+
this.ds = d;
64+
scheduler.scheduleDirect(this);
65+
}
66+
}
67+
68+
@Override
69+
public void run() {
70+
ds.dispose();
71+
}
72+
73+
@Override
74+
public boolean isDisposed() {
75+
return DisposableHelper.isDisposed(get());
76+
}
77+
78+
@Override
79+
public void onSubscribe(Disposable d) {
80+
if (DisposableHelper.setOnce(this, d)) {
81+
actual.onSubscribe(this);
82+
}
83+
}
84+
85+
@Override
86+
public void onSuccess(T value) {
87+
actual.onSuccess(value);
88+
}
89+
90+
@Override
91+
public void onError(Throwable e) {
92+
actual.onError(e);
93+
}
94+
}
95+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import static org.junit.Assert.*;
17+
18+
import java.util.concurrent.*;
19+
20+
import org.junit.Test;
21+
22+
import io.reactivex.*;
23+
import io.reactivex.disposables.Disposable;
24+
import io.reactivex.exceptions.TestException;
25+
import io.reactivex.functions.*;
26+
import io.reactivex.processors.PublishProcessor;
27+
import io.reactivex.schedulers.Schedulers;
28+
29+
public class SingleUnsubscribeOnTest {
30+
31+
@Test
32+
public void normal() throws Exception {
33+
PublishProcessor<Integer> pp = PublishProcessor.create();
34+
35+
final String[] name = { null };
36+
37+
final CountDownLatch cdl = new CountDownLatch(1);
38+
39+
pp.doOnCancel(new Action() {
40+
@Override
41+
public void run() throws Exception {
42+
name[0] = Thread.currentThread().getName();
43+
cdl.countDown();
44+
}
45+
})
46+
.single(-99)
47+
.unsubscribeOn(Schedulers.single())
48+
.test(true)
49+
;
50+
51+
assertTrue(cdl.await(5, TimeUnit.SECONDS));
52+
53+
int times = 10;
54+
55+
while (times-- > 0 && pp.hasSubscribers()) {
56+
Thread.sleep(100);
57+
}
58+
59+
assertFalse(pp.hasSubscribers());
60+
61+
assertNotEquals(Thread.currentThread().getName(), name[0]);
62+
}
63+
64+
@Test
65+
public void just() {
66+
Single.just(1)
67+
.unsubscribeOn(Schedulers.single())
68+
.test()
69+
.assertResult(1);
70+
}
71+
72+
@Test
73+
public void error() {
74+
Single.<Integer>error(new TestException())
75+
.unsubscribeOn(Schedulers.single())
76+
.test()
77+
.assertFailure(TestException.class);
78+
}
79+
80+
@Test
81+
public void dispose() {
82+
TestHelper.checkDisposed(Single.just(1)
83+
.unsubscribeOn(Schedulers.single()));
84+
}
85+
86+
@Test
87+
public void doubleOnSubscribe() {
88+
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Object>, SingleSource<Object>>() {
89+
@Override
90+
public SingleSource<Object> apply(Single<Object> v) throws Exception {
91+
return v.unsubscribeOn(Schedulers.single());
92+
}
93+
});
94+
}
95+
96+
@Test
97+
public void disposeRace() {
98+
for (int i = 0; i < 500; i++) {
99+
PublishProcessor<Integer> pp = PublishProcessor.create();
100+
101+
final Disposable[] ds = { null };
102+
pp.single(-99).unsubscribeOn(Schedulers.computation())
103+
.subscribe(new SingleObserver<Integer>() {
104+
@Override
105+
public void onSubscribe(Disposable d) {
106+
ds[0] = d;
107+
}
108+
109+
@Override
110+
public void onSuccess(Integer value) {
111+
112+
}
113+
114+
@Override
115+
public void onError(Throwable e) {
116+
117+
}
118+
});
119+
120+
Runnable r = new Runnable() {
121+
@Override
122+
public void run() {
123+
ds[0].dispose();
124+
}
125+
};
126+
127+
TestHelper.race(r, r, Schedulers.single());
128+
}
129+
}
130+
}

0 commit comments

Comments
 (0)