Skip to content

Commit 7e0f6d8

Browse files
authored
2.x: add subscribeWith to get back the subscriber/observer fluently (#4422)
1 parent 7aa0b34 commit 7e0f6d8

File tree

5 files changed

+178
-2
lines changed

5 files changed

+178
-2
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1413,6 +1413,31 @@ public final void subscribe(CompletableObserver s) {
14131413
*/
14141414
protected abstract void subscribeActual(CompletableObserver s);
14151415

1416+
/**
1417+
* Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
1418+
* CompletableObserver as is.
1419+
* <p>Usage example:
1420+
* <pre><code>
1421+
* Completable<Integer> source = Completable.complete().delay(1, TimeUnit.SECONDS);
1422+
* CompositeDisposable composite = new CompositeDisposable();
1423+
*
1424+
* class ResourceCompletableObserver implements CompletableObserver, Disposable {
1425+
* // ...
1426+
* }
1427+
*
1428+
* composite.add(source.subscribeWith(new ResourceCompletableObserver()));
1429+
* </code></pre>
1430+
* @param <E> the type of the CompletableObserver to use and return
1431+
* @param observer the CompletableObserver (subclass) to use and return, not null
1432+
* @return the input {@code observer}
1433+
* @throws NullPointerException if {@code observer} is null
1434+
* @since 2.0
1435+
*/
1436+
public final <E extends CompletableObserver> E subscribeWith(E observer) {
1437+
subscribe(observer);
1438+
return observer;
1439+
}
1440+
14161441
/**
14171442
* Subscribes to this Completable and calls back either the onError or onComplete functions.
14181443
* <dl>

src/main/java/io/reactivex/Flowable.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11709,7 +11709,32 @@ public final void subscribe(Subscriber<? super T> s) {
1170911709
* @param s the incoming Subscriber, never null
1171011710
*/
1171111711
protected abstract void subscribeActual(Subscriber<? super T> s);
11712-
11712+
11713+
/**
11714+
* Subscribes a given Subscriber (subclass) to this Flowable and returns the given
11715+
* Subscriber as is.
11716+
* <p>Usage example:
11717+
* <pre><code>
11718+
* Flowable<Integer> source = Flowable.range(1, 10);
11719+
* CompositeDisposable composite = new CompositeDisposable();
11720+
*
11721+
* ResourceSubscriber&lt;Integer> rs = new ResourceSubscriber&lt;>() {
11722+
* // ...
11723+
* };
11724+
*
11725+
* composite.add(source.subscribeWith(rs));
11726+
* </code></pre>
11727+
* @param <E> the type of the Subscriber to use and return
11728+
* @param subscriber the Subscriber (subclass) to use and return, not null
11729+
* @return the input {@code subscriber}
11730+
* @throws NullPointerException if {@code subscriber} is null
11731+
* @since 2.0
11732+
*/
11733+
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
11734+
subscribe(subscriber);
11735+
return subscriber;
11736+
}
11737+
1171311738
/**
1171411739
* Asynchronously subscribes Observers to this Publisher on the specified {@link Scheduler}.
1171511740
* <p>

src/main/java/io/reactivex/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9886,6 +9886,31 @@ public final void subscribe(Observer<? super T> observer) {
98869886
*/
98879887
protected abstract void subscribeActual(Observer<? super T> observer);
98889888

9889+
/**
9890+
* Subscribes a given Observer (subclass) to this Observable and returns the given
9891+
* Observer as is.
9892+
* <p>Usage example:
9893+
* <pre><code>
9894+
* Observable<Integer> source = Observable.range(1, 10);
9895+
* CompositeDisposable composite = new CompositeDisposable();
9896+
*
9897+
* ResourceObserver&lt;Integer> rs = new ResourceSubscriber&lt;>() {
9898+
* // ...
9899+
* };
9900+
*
9901+
* composite.add(source.subscribeWith(rs));
9902+
* </code></pre>
9903+
* @param <E> the type of the Observer to use and return
9904+
* @param observer the Observer (subclass) to use and return, not null
9905+
* @return the input {@code observer}
9906+
* @throws NullPointerException if {@code observer} is null
9907+
* @since 2.0
9908+
*/
9909+
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
9910+
subscribe(observer);
9911+
return observer;
9912+
}
9913+
98899914
/**
98909915
* Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
98919916
* <p>

src/main/java/io/reactivex/Single.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2156,7 +2156,32 @@ public final void subscribe(SingleObserver<? super T> subscriber) {
21562156
* @param observer the SingleObserver to handle, not null
21572157
*/
21582158
protected abstract void subscribeActual(SingleObserver<? super T> observer);
2159-
2159+
2160+
/**
2161+
* Subscribes a given SingleObserver (subclass) to this Single and returns the given
2162+
* SingleObserver as is.
2163+
* <p>Usage example:
2164+
* <pre><code>
2165+
* Single<Integer> source = Single.just(1);
2166+
* CompositeDisposable composite = new CompositeDisposable();
2167+
*
2168+
* class ResourceSingleObserver implements SingleObserver&lt;Integer>, Disposable {
2169+
* // ...
2170+
* }
2171+
*
2172+
* composite.add(source.subscribeWith(new ResourceSingleObserver()));
2173+
* </code></pre>
2174+
* @param <E> the type of the SingleObserver to use and return
2175+
* @param observer the SingleObserver (subclass) to use and return, not null
2176+
* @return the input {@code observer}
2177+
* @throws NullPointerException if {@code observer} is null
2178+
* @since 2.0
2179+
*/
2180+
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
2181+
subscribe(observer);
2182+
return observer;
2183+
}
2184+
21602185
/**
21612186
* Asynchronously subscribes subscribers to this Single on the specified {@link Scheduler}.
21622187
* <p>
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal;
15+
16+
import org.junit.Test;
17+
import static org.junit.Assert.*;
18+
19+
import io.reactivex.*;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.observers.TestObserver;
22+
import io.reactivex.subscribers.TestSubscriber;
23+
24+
public class SubscribeWithTest {
25+
26+
@Test
27+
public void withFlowable() {
28+
Flowable.range(1, 10)
29+
.subscribeWith(new TestSubscriber<Integer>())
30+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
31+
}
32+
33+
34+
@Test
35+
public void withObservable() {
36+
Observable.range(1, 10)
37+
.subscribeWith(new TestObserver<Integer>())
38+
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
39+
}
40+
41+
42+
class ObserverImpl implements SingleObserver<Object>, CompletableObserver {
43+
Object value;
44+
45+
@Override
46+
public void onSubscribe(Disposable d) {
47+
48+
}
49+
50+
@Override
51+
public void onComplete() {
52+
this.value = 100;
53+
}
54+
55+
@Override
56+
public void onSuccess(Object value) {
57+
this.value = value;
58+
}
59+
60+
@Override
61+
public void onError(Throwable e) {
62+
this.value = e;
63+
}
64+
}
65+
66+
@Test
67+
public void withSingle() {
68+
assertEquals(1, Single.just(1).subscribeWith(new ObserverImpl()).value);
69+
}
70+
71+
@Test
72+
public void withCompletable() {
73+
assertEquals(100, Completable.complete().subscribeWith(new ObserverImpl()).value);
74+
}
75+
76+
}

0 commit comments

Comments
 (0)