Skip to content

Commit 7e913fa

Browse files
vanniktechakarnokd
authored andcommitted
1.x: Single.flatMapCompletable (#4226)
* 1.x: Single.flatMapCompletable * Add documentation and CompletableFlatMapSingleToCompletable * Add Generic * Add ? extends Completable * Add @experimental * Add Java Doc link * Change test for artem
1 parent 1e147fb commit 7e913fa

File tree

3 files changed

+179
-0
lines changed

3 files changed

+179
-0
lines changed

src/main/java/rx/Single.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,6 +1396,29 @@ public final <R> Observable<R> flatMapObservable(Func1<? super T, ? extends Obse
13961396
return Observable.merge(asObservable(map(func)));
13971397
}
13981398

1399+
/**
1400+
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
1401+
* source {@link Completable}, where that function returns a {@link Completable}.
1402+
* <p>
1403+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapCompletable.png" alt="">
1404+
* <dl>
1405+
* <dt><b>Scheduler:</b></dt>
1406+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
1407+
* </dl>
1408+
*
1409+
* @param func
1410+
* a function that, when applied to the item emitted by the source Single, returns a
1411+
* Completable
1412+
* @return the Completable returned from {@code func} when applied to the item emitted by the source Single
1413+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1414+
* @Experimental The behavior of this can change at any time.
1415+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
1416+
*/
1417+
@Experimental
1418+
public final Completable flatMapCompletable(final Func1<? super T, ? extends Completable> func) {
1419+
return Completable.create(new CompletableFlatMapSingleToCompletable<T>(this, func));
1420+
}
1421+
13991422
/**
14001423
* Returns a Single that applies a specified function to the item emitted by the source Single and
14011424
* emits the result of this function application.
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.Completable;
20+
import rx.Completable.CompletableOnSubscribe;
21+
import rx.Completable.CompletableSubscriber;
22+
import rx.Single;
23+
import rx.SingleSubscriber;
24+
import rx.Subscription;
25+
import rx.exceptions.Exceptions;
26+
import rx.functions.Func1;
27+
28+
public final class CompletableFlatMapSingleToCompletable<T> implements CompletableOnSubscribe {
29+
30+
final Single<T> source;
31+
32+
final Func1<? super T, ? extends Completable> mapper;
33+
34+
public CompletableFlatMapSingleToCompletable(Single<T> source, Func1<? super T, ? extends Completable> mapper) {
35+
this.source = source;
36+
this.mapper = mapper;
37+
}
38+
39+
@Override
40+
public void call(CompletableSubscriber t) {
41+
SourceSubscriber<T> parent = new SourceSubscriber<T>(t, mapper);
42+
t.onSubscribe(parent);
43+
source.subscribe(parent);
44+
}
45+
46+
static final class SourceSubscriber<T> extends SingleSubscriber<T> implements CompletableSubscriber {
47+
final CompletableSubscriber actual;
48+
49+
final Func1<? super T, ? extends Completable> mapper;
50+
51+
public SourceSubscriber(CompletableSubscriber actual, Func1<? super T, ? extends Completable> mapper) {
52+
this.actual = actual;
53+
this.mapper = mapper;
54+
}
55+
56+
@Override
57+
public void onSuccess(T value) {
58+
Completable c;
59+
60+
try {
61+
c = mapper.call(value);
62+
} catch (Throwable ex) {
63+
Exceptions.throwIfFatal(ex);
64+
onError(ex);
65+
return;
66+
}
67+
68+
if (c == null) {
69+
onError(new NullPointerException("The mapper returned a null Completable"));
70+
return;
71+
}
72+
73+
c.subscribe(this);
74+
}
75+
76+
@Override
77+
public void onError(Throwable error) {
78+
actual.onError(error);
79+
}
80+
81+
@Override
82+
public void onCompleted() {
83+
actual.onCompleted();
84+
}
85+
86+
@Override
87+
public void onSubscribe(Subscription d) {
88+
add(d);
89+
}
90+
}
91+
92+
}

src/test/java/rx/SingleTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2005,4 +2005,68 @@ public Single<Integer> call(Integer v) {
20052005
assertFalse("Observers present?!", ps.hasObservers());
20062006
}
20072007

2008+
@Test
2009+
public void flatMapCompletableComplete() {
2010+
final AtomicInteger atomicInteger = new AtomicInteger();
2011+
TestSubscriber testSubscriber = TestSubscriber.create();
2012+
2013+
Single.just(1).flatMapCompletable(new Func1<Integer, Completable>() {
2014+
@Override
2015+
public Completable call(final Integer integer) {
2016+
return Completable.fromAction(new Action0() {
2017+
@Override
2018+
public void call() {
2019+
atomicInteger.set(5);
2020+
}
2021+
});
2022+
}
2023+
}).subscribe(testSubscriber);
2024+
2025+
testSubscriber.assertCompleted();
2026+
2027+
assertEquals(5, atomicInteger.get());
2028+
}
2029+
2030+
@Test
2031+
public void flatMapCompletableError() {
2032+
final RuntimeException error = new RuntimeException("some error");
2033+
TestSubscriber testSubscriber = TestSubscriber.create();
2034+
2035+
Single.just(1).flatMapCompletable(new Func1<Integer, Completable>() {
2036+
@Override
2037+
public Completable call(final Integer integer) {
2038+
return Completable.error(error);
2039+
}
2040+
}).subscribe(testSubscriber);
2041+
2042+
testSubscriber.assertError(error);
2043+
}
2044+
2045+
@Test
2046+
public void flatMapCompletableNullCompletable() {
2047+
TestSubscriber testSubscriber = TestSubscriber.create();
2048+
2049+
Single.just(1).flatMapCompletable(new Func1<Integer, Completable>() {
2050+
@Override
2051+
public Completable call(final Integer integer) {
2052+
return null;
2053+
}
2054+
}).subscribe(testSubscriber);
2055+
2056+
testSubscriber.assertError(NullPointerException.class);
2057+
}
2058+
2059+
@Test
2060+
public void flatMapCompletableException() {
2061+
TestSubscriber testSubscriber = TestSubscriber.create();
2062+
2063+
Single.just(1).flatMapCompletable(new Func1<Integer, Completable>() {
2064+
@Override
2065+
public Completable call(final Integer integer) {
2066+
throw new UnsupportedOperationException();
2067+
}
2068+
}).subscribe(testSubscriber);
2069+
2070+
testSubscriber.assertError(UnsupportedOperationException.class);
2071+
}
20082072
}

0 commit comments

Comments
 (0)