Skip to content

Commit c43229b

Browse files
bmaslakovakarnokd
authored andcommitted
implement Maybe.switchIfEmpty(Single) (#5582)
* implement Maybe.switchIfEmpty(Single) * switchIfEmpty(Single) returns single; remove all changes unrelated to the PR * add 'experimental' annotation
1 parent 14bebc5 commit c43229b

File tree

3 files changed

+257
-0
lines changed

3 files changed

+257
-0
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3820,6 +3820,31 @@ public final Maybe<T> switchIfEmpty(MaybeSource<? extends T> other) {
38203820
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmpty<T>(this, other));
38213821
}
38223822

3823+
/**
3824+
* Returns a Single that emits the items emitted by the source Maybe or the item of an alternate
3825+
* SingleSource if the current Maybe is empty.
3826+
* <p>
3827+
* <img width="640" height="445" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchifempty.m.png" alt="">
3828+
* <p/>
3829+
* <dl>
3830+
* <dt><b>Scheduler:</b></dt>
3831+
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
3832+
* </dl>
3833+
*
3834+
* @param other
3835+
* the alternate SingleSource to subscribe to if the main does not emit any items
3836+
* @return a Single that emits the items emitted by the source Maybe or the item of an
3837+
* alternate SingleSource if the source Maybe is empty.
3838+
* @since 2.1.4 - experimental
3839+
*/
3840+
@Experimental
3841+
@CheckReturnValue
3842+
@SchedulerSupport(SchedulerSupport.NONE)
3843+
public final Single<T> switchIfEmpty(SingleSource<? extends T> other) {
3844+
ObjectHelper.requireNonNull(other, "other is null");
3845+
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmptySingle<T>(this, other));
3846+
}
3847+
38233848
/**
38243849
* Returns a Maybe that emits the items emitted by the source Maybe until a second MaybeSource
38253850
* emits an item.
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.fuseable.HasUpstreamMaybeSource;
20+
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
/**
24+
* Subscribes to the other source if the main source is empty.
25+
*
26+
* @param <T> the value type
27+
*/
28+
public final class MaybeSwitchIfEmptySingle<T> extends Single<T> implements HasUpstreamMaybeSource<T> {
29+
30+
final MaybeSource<T> source;
31+
final SingleSource<? extends T> other;
32+
33+
public MaybeSwitchIfEmptySingle(MaybeSource<T> source, SingleSource<? extends T> other) {
34+
this.source = source;
35+
this.other = other;
36+
}
37+
38+
@Override
39+
public MaybeSource<T> source() {
40+
return source;
41+
}
42+
43+
@Override
44+
protected void subscribeActual(SingleObserver<? super T> observer) {
45+
source.subscribe(new SwitchIfEmptyMaybeObserver<T>(observer, other));
46+
}
47+
48+
static final class SwitchIfEmptyMaybeObserver<T>
49+
extends AtomicReference<Disposable>
50+
implements MaybeObserver<T>, Disposable {
51+
52+
private static final long serialVersionUID = 4603919676453758899L;
53+
54+
final SingleObserver<? super T> actual;
55+
56+
final SingleSource<? extends T> other;
57+
58+
SwitchIfEmptyMaybeObserver(SingleObserver<? super T> actual, SingleSource<? extends T> other) {
59+
this.actual = actual;
60+
this.other = other;
61+
}
62+
63+
@Override
64+
public void dispose() {
65+
DisposableHelper.dispose(this);
66+
}
67+
68+
@Override
69+
public boolean isDisposed() {
70+
return DisposableHelper.isDisposed(get());
71+
}
72+
73+
@Override
74+
public void onSubscribe(Disposable d) {
75+
if (DisposableHelper.setOnce(this, d)) {
76+
actual.onSubscribe(this);
77+
}
78+
}
79+
80+
@Override
81+
public void onSuccess(T value) {
82+
actual.onSuccess(value);
83+
}
84+
85+
@Override
86+
public void onError(Throwable e) {
87+
actual.onError(e);
88+
}
89+
90+
@Override
91+
public void onComplete() {
92+
Disposable d = get();
93+
if (d != DisposableHelper.DISPOSED) {
94+
if (compareAndSet(d, null)) {
95+
other.subscribe(new OtherSingleObserver<T>(actual, this));
96+
}
97+
}
98+
}
99+
100+
static final class OtherSingleObserver<T> implements SingleObserver<T> {
101+
102+
final SingleObserver<? super T> actual;
103+
104+
final AtomicReference<Disposable> parent;
105+
OtherSingleObserver(SingleObserver<? super T> actual, AtomicReference<Disposable> parent) {
106+
this.actual = actual;
107+
this.parent = parent;
108+
}
109+
@Override
110+
public void onSubscribe(Disposable d) {
111+
DisposableHelper.setOnce(parent, d);
112+
}
113+
@Override
114+
public void onSuccess(T value) {
115+
actual.onSuccess(value);
116+
}
117+
@Override
118+
public void onError(Throwable e) {
119+
actual.onError(e);
120+
}
121+
}
122+
123+
}
124+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.Single;
18+
import io.reactivex.TestHelper;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Function;
21+
import io.reactivex.observers.TestObserver;
22+
import io.reactivex.processors.PublishProcessor;
23+
import io.reactivex.schedulers.Schedulers;
24+
import org.junit.Test;
25+
26+
import static org.junit.Assert.assertFalse;
27+
import static org.junit.Assert.assertTrue;
28+
29+
public class MaybeSwitchIfEmptySingleTest {
30+
31+
@Test
32+
public void nonEmpty() {
33+
Maybe.just(1).switchIfEmpty(Single.just(2)).test().assertResult(1);
34+
}
35+
36+
@Test
37+
public void empty() {
38+
Maybe.<Integer>empty().switchIfEmpty(Single.just(2)).test().assertResult(2);
39+
}
40+
41+
@Test
42+
public void error() {
43+
Maybe.<Integer>error(new TestException()).switchIfEmpty(Single.just(2))
44+
.test().assertFailure(TestException.class);
45+
}
46+
47+
@Test
48+
public void errorOther() {
49+
Maybe.empty().switchIfEmpty(Single.<Integer>error(new TestException()))
50+
.test().assertFailure(TestException.class);
51+
}
52+
53+
@Test
54+
public void dispose() {
55+
PublishProcessor<Integer> pp = PublishProcessor.create();
56+
57+
TestObserver<Integer> ts = pp.singleElement().switchIfEmpty(Single.just(2)).test();
58+
59+
assertTrue(pp.hasSubscribers());
60+
61+
ts.cancel();
62+
63+
assertFalse(pp.hasSubscribers());
64+
}
65+
66+
67+
@Test
68+
public void isDisposed() {
69+
PublishProcessor<Integer> pp = PublishProcessor.create();
70+
71+
TestHelper.checkDisposed(pp.singleElement().switchIfEmpty(Single.just(2)));
72+
}
73+
74+
@Test
75+
public void doubleOnSubscribe() {
76+
TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function<Maybe<Integer>, Single<Integer>>() {
77+
@Override
78+
public Single<Integer> apply(Maybe<Integer> f) throws Exception {
79+
return f.switchIfEmpty(Single.just(2));
80+
}
81+
});
82+
}
83+
84+
@Test
85+
public void emptyCancelRace() {
86+
for (int i = 0; i < 500; i++) {
87+
final PublishProcessor<Integer> pp = PublishProcessor.create();
88+
89+
final TestObserver<Integer> ts = pp.singleElement().switchIfEmpty(Single.just(2)).test();
90+
91+
Runnable r1 = new Runnable() {
92+
@Override
93+
public void run() {
94+
pp.onComplete();
95+
}
96+
};
97+
98+
Runnable r2 = new Runnable() {
99+
@Override
100+
public void run() {
101+
ts.cancel();
102+
}
103+
};
104+
105+
TestHelper.race(r1, r2, Schedulers.single());
106+
}
107+
}
108+
}

0 commit comments

Comments
 (0)