Skip to content

Commit c307e4f

Browse files
vanniktechakarnokd
authored andcommitted
1.x: Single add doOnEach (#4461)
* 1.x: Single add doOnEach * Add JavaDoc
1 parent 11343ae commit c307e4f

File tree

4 files changed

+192
-24
lines changed

4 files changed

+192
-24
lines changed

src/main/java/rx/Single.java

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2406,27 +2406,54 @@ public final <T2, R> Single<R> zipWith(Single<? extends T2> other, Func2<? super
24062406
* the action to invoke if the source {@link Single} calls {@code onError}
24072407
* @return the source {@link Single} with the side-effecting behavior applied
24082408
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
2409+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
24092410
*/
24102411
@Experimental
24112412
public final Single<T> doOnError(final Action1<Throwable> onError) {
2412-
Observer<T> observer = new Observer<T>() {
2413+
if (onError == null) {
2414+
throw new IllegalArgumentException("onError is null");
2415+
}
2416+
2417+
return Single.create(new SingleDoOnEvent<T>(this, Actions.empty(), new Action1<Throwable>() {
24132418
@Override
2414-
public void onCompleted() {
2415-
// deliberately ignored
2419+
public void call(final Throwable throwable) {
2420+
onError.call(throwable);
24162421
}
2422+
}));
2423+
}
24172424

2425+
/**
2426+
* Modifies the source {@link Single} so that it invokes an action when it calls {@code onSuccess} or {@code onError}.
2427+
* <p>
2428+
* <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
2429+
* <dl>
2430+
* <dt><b>Scheduler:</b></dt>
2431+
* <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
2432+
* </dl>
2433+
*
2434+
* @param onNotification
2435+
* the action to invoke when the source {@link Single} calls {@code onSuccess} or {@code onError}.
2436+
* @return the source {@link Single} with the side-effecting behavior applied
2437+
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
2438+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
2439+
*/
2440+
@Experimental
2441+
public final Single<T> doOnEach(final Action1<Notification<? extends T>> onNotification) {
2442+
if (onNotification == null) {
2443+
throw new IllegalArgumentException("onNotification is null");
2444+
}
2445+
2446+
return Single.create(new SingleDoOnEvent<T>(this, new Action1<T>() {
24182447
@Override
2419-
public void onError(Throwable e) {
2420-
onError.call(e);
2448+
public void call(final T t) {
2449+
onNotification.call(Notification.<T>createOnNext(t));
24212450
}
2422-
2451+
}, new Action1<Throwable>() {
24232452
@Override
2424-
public void onNext(T t) {
2425-
// deliberately ignored
2453+
public void call(final Throwable throwable) {
2454+
onNotification.call(Notification.<T>createOnError(throwable));
24262455
}
2427-
};
2428-
2429-
return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
2456+
}));
24302457
}
24312458

24322459
/**
@@ -2442,27 +2469,25 @@ public void onNext(T t) {
24422469
* the action to invoke when the source {@link Single} calls {@code onSuccess}
24432470
* @return the source {@link Single} with the side-effecting behavior applied
24442471
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
2472+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
24452473
*/
24462474
@Experimental
24472475
public final Single<T> doOnSuccess(final Action1<? super T> onSuccess) {
2448-
Observer<T> observer = new Observer<T>() {
2449-
@Override
2450-
public void onCompleted() {
2451-
// deliberately ignored
2452-
}
2476+
if (onSuccess == null) {
2477+
throw new IllegalArgumentException("onSuccess is null");
2478+
}
24532479

2480+
return Single.create(new SingleDoOnEvent<T>(this, new Action1<T>() {
24542481
@Override
2455-
public void onError(Throwable e) {
2456-
// deliberately ignored
2482+
public void call(final T t) {
2483+
onSuccess.call(t);
24572484
}
2458-
2485+
}, new Action1<Throwable>() {
24592486
@Override
2460-
public void onNext(T t) {
2461-
onSuccess.call(t);
2487+
public void call(final Throwable throwable) {
2488+
// Do nothing.
24622489
}
2463-
};
2464-
2465-
return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
2490+
}));
24662491
}
24672492

24682493
/**

src/main/java/rx/exceptions/Exceptions.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,20 @@ public static void throwOrReport(Throwable t, Observer<?> o, Object value) {
190190
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
191191
}
192192

193+
/**
194+
* Forwards a fatal exception or reports it along with the value
195+
* caused it to the given SingleSubscriber.
196+
* @param t the exception
197+
* @param o the observer to report to
198+
* @param value the value that caused the exception
199+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
200+
*/
201+
@Experimental
202+
public static void throwOrReport(Throwable t, SingleSubscriber<?> o, Object value) {
203+
Exceptions.throwIfFatal(t);
204+
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
205+
}
206+
193207
/**
194208
* Forwards a fatal exception or reports it to the given Observer.
195209
* @param t the exception
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.Single;
20+
import rx.SingleSubscriber;
21+
import rx.exceptions.CompositeException;
22+
import rx.exceptions.Exceptions;
23+
import rx.functions.Action1;
24+
25+
public final class SingleDoOnEvent<T> implements Single.OnSubscribe<T> {
26+
final Single<T> source;
27+
final Action1<? super T> onSuccess;
28+
final Action1<Throwable> onError;
29+
30+
public SingleDoOnEvent(Single<T> source, Action1<? super T> onSuccess, Action1<Throwable> onError) {
31+
this.source = source;
32+
this.onSuccess = onSuccess;
33+
this.onError = onError;
34+
}
35+
36+
@Override
37+
public void call(SingleSubscriber<? super T> actual) {
38+
SingleDoOnEventSubscriber<T> parent = new SingleDoOnEventSubscriber<T>(actual, onSuccess, onError);
39+
actual.add(parent);
40+
source.subscribe(parent);
41+
}
42+
43+
static final class SingleDoOnEventSubscriber<T> extends SingleSubscriber<T> {
44+
final SingleSubscriber<? super T> actual;
45+
final Action1<? super T> onSuccess;
46+
final Action1<Throwable> onError;
47+
48+
SingleDoOnEventSubscriber(SingleSubscriber<? super T> actual, Action1<? super T> onSuccess, Action1<Throwable> onError) {
49+
this.actual = actual;
50+
this.onSuccess = onSuccess;
51+
this.onError = onError;
52+
}
53+
54+
@Override
55+
public void onSuccess(T value) {
56+
try {
57+
onSuccess.call(value);
58+
} catch (Throwable e) {
59+
Exceptions.throwOrReport(e, this, value);
60+
return;
61+
}
62+
63+
actual.onSuccess(value);
64+
}
65+
66+
@Override
67+
public void onError(Throwable error) {
68+
try {
69+
onError.call(error);
70+
} catch (Throwable e) {
71+
Exceptions.throwIfFatal(e);
72+
actual.onError(new CompositeException(error, e));
73+
return;
74+
}
75+
76+
actual.onError(error);
77+
}
78+
}
79+
}

src/test/java/rx/SingleTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,11 @@ public void toCompletableError() {
869869
testSubscriber.assertNotCompleted();
870870
}
871871

872+
@Test(expected = IllegalArgumentException.class)
873+
public void doOnErrorNull() {
874+
Single.just(1).doOnError(null);
875+
}
876+
872877
@Test
873878
public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() {
874879
@SuppressWarnings("unchecked")
@@ -974,6 +979,11 @@ public void shouldPassErrorFromCallable() throws Exception {
974979
verify(callable).call();
975980
}
976981

982+
@Test(expected = IllegalArgumentException.class)
983+
public void doOnSuccessNull() {
984+
Single.just(1).doOnSuccess(null);
985+
}
986+
977987
@Test
978988
public void doOnSuccessShouldInvokeAction() {
979989
@SuppressWarnings("unchecked")
@@ -2086,4 +2096,44 @@ public Object call(Single<String> single) {
20862096
assertSame(expectedResult, actualResult);
20872097
assertSame(s, singleRef.get());
20882098
}
2099+
2100+
@Test(expected = IllegalArgumentException.class)
2101+
public void doOnEachNull() {
2102+
Single.just(1).doOnEach(null);
2103+
}
2104+
2105+
@Test
2106+
public void doOnEachError() {
2107+
final AtomicInteger atomicInteger = new AtomicInteger(0);
2108+
Single.error(new RuntimeException()).doOnEach(new Action1<Notification<?>>() {
2109+
@Override
2110+
public void call(final Notification<?> notification) {
2111+
if (notification.isOnError()) {
2112+
atomicInteger.incrementAndGet();
2113+
}
2114+
}
2115+
}).subscribe(Actions.empty(), new Action1<Throwable>() {
2116+
@Override
2117+
public void call(final Throwable throwable) {
2118+
// Do nothing this is expected.
2119+
}
2120+
});
2121+
2122+
assertEquals(1, atomicInteger.get());
2123+
}
2124+
2125+
@Test
2126+
public void doOnEachSuccess() {
2127+
final AtomicInteger atomicInteger = new AtomicInteger(0);
2128+
Single.just(1).doOnEach(new Action1<Notification<? extends Integer>>() {
2129+
@Override
2130+
public void call(final Notification<? extends Integer> notification) {
2131+
if (notification.isOnNext()) {
2132+
atomicInteger.getAndAdd(notification.getValue());
2133+
}
2134+
}
2135+
}).subscribe();
2136+
2137+
assertEquals(1, atomicInteger.get());
2138+
}
20892139
}

0 commit comments

Comments
 (0)