Skip to content

Commit eddc153

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add doOnEvent to Single & Completable (#4479)
1 parent 12ca280 commit eddc153

File tree

6 files changed

+186
-1
lines changed

6 files changed

+186
-1
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,33 @@ public final Completable doOnError(Consumer<? super Throwable> onError) {
984984
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
985985
}
986986

987+
/**
988+
* Returns a Completable which calls the given onEvent callback with the (throwable) for an onError
989+
* or (null) for an onComplete signal from this Completable before delivering said signal to the downstream.
990+
* <dl>
991+
* <dt><b>Scheduler:</b></dt>
992+
* <dd>{@code doOnEvent} does not operate by default on a particular {@link Scheduler}.</dd>
993+
* </dl>
994+
* @param onEvent the event callback
995+
* @return the new Completable instance
996+
* @throws NullPointerException if onEvent is null
997+
*/
998+
@SchedulerSupport(SchedulerSupport.NONE)
999+
public final Completable doOnEvent(final Consumer<? super Throwable> onEvent) {
1000+
ObjectHelper.requireNonNull(onEvent, "onEvent is null");
1001+
return doOnLifecycle(Functions.emptyConsumer(), new Consumer<Throwable>() {
1002+
@Override
1003+
public void accept(final Throwable throwable) throws Exception {
1004+
onEvent.accept(throwable);
1005+
}
1006+
}, new Action() {
1007+
@Override
1008+
public void run() throws Exception {
1009+
onEvent.accept(null);
1010+
}
1011+
}, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
1012+
}
1013+
9871014
/**
9881015
* Returns a Completable instance that calls the various callbacks on the specific
9891016
* lifecycle events.

src/main/java/io/reactivex/Maybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -859,7 +859,7 @@ public final Maybe<T> doOnError(Consumer<? super Throwable> onError) {
859859
}
860860

861861
/**
862-
* Calls the given onEvent callback with the (success value, null) for an onSuccess, (null, throwabe) for
862+
* Calls the given onEvent callback with the (success value, null) for an onSuccess, (null, throwable) for
863863
* an onError or (null, null) for an onComplete signal from this Maybe before delivering said
864864
* signal to the downstream.
865865
* <p>

src/main/java/io/reactivex/Single.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1591,6 +1591,22 @@ public final Single<T> doOnSuccess(final Consumer<? super T> onSuccess) {
15911591
return RxJavaPlugins.onAssembly(new SingleDoOnSuccess<T>(this, onSuccess));
15921592
}
15931593

1594+
/**
1595+
* Calls the shared consumer with the error sent via onError or the value
1596+
* via onSuccess for each SingleObserver that subscribes to the current Single.
1597+
* <dl>
1598+
* <dt><b>Scheduler:</b></dt>
1599+
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
1600+
* </dl>
1601+
* @param onEvent the consumer called with the success value of onEvent
1602+
* @return the new Single instance
1603+
* @since 2.0
1604+
*/
1605+
public final Single<T> doOnEvent(final BiConsumer<? super T, ? super Throwable> onEvent) {
1606+
ObjectHelper.requireNonNull(onEvent, "onEvent is null");
1607+
return RxJavaPlugins.onAssembly(new SingleDoOnEvent<T>(this, onEvent));
1608+
}
1609+
15941610
/**
15951611
* Calls the shared consumer with the error sent via onError for each
15961612
* SingleObserver that subscribes to the current Single.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Single;
17+
import io.reactivex.SingleObserver;
18+
import io.reactivex.SingleSource;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.CompositeException;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.BiConsumer;
23+
24+
public final class SingleDoOnEvent<T> extends Single<T> {
25+
final SingleSource<T> source;
26+
27+
final BiConsumer<? super T, ? super Throwable> onEvent;
28+
29+
public SingleDoOnEvent(SingleSource<T> source, BiConsumer<? super T, ? super Throwable> onEvent) {
30+
this.source = source;
31+
this.onEvent = onEvent;
32+
}
33+
34+
@Override
35+
protected void subscribeActual(final SingleObserver<? super T> s) {
36+
37+
source.subscribe(new SingleObserver<T>() {
38+
@Override
39+
public void onSubscribe(Disposable d) {
40+
s.onSubscribe(d);
41+
}
42+
43+
@Override
44+
public void onSuccess(T value) {
45+
try {
46+
onEvent.accept(value, null);
47+
} catch (Throwable ex) {
48+
Exceptions.throwIfFatal(ex);
49+
s.onError(ex);
50+
return;
51+
}
52+
53+
s.onSuccess(value);
54+
}
55+
56+
@Override
57+
public void onError(Throwable e) {
58+
try {
59+
onEvent.accept(null, e);
60+
} catch (Throwable ex) {
61+
Exceptions.throwIfFatal(ex);
62+
e = new CompositeException(ex, e);
63+
}
64+
s.onError(e);
65+
}
66+
});
67+
}
68+
}

src/test/java/io/reactivex/completable/CompletableTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4463,4 +4463,41 @@ public void run() {
44634463
exec.shutdown();
44644464
}
44654465
}
4466+
4467+
@Test(expected = NullPointerException.class)
4468+
public void doOnEventNullEvent() {
4469+
Completable.complete().doOnEvent(null);
4470+
}
4471+
4472+
@Test
4473+
public void doOnEventComplete() {
4474+
final AtomicInteger atomicInteger = new AtomicInteger(0);
4475+
4476+
Completable.complete().doOnEvent(new Consumer<Throwable>() {
4477+
@Override
4478+
public void accept(final Throwable throwable) throws Exception {
4479+
if (throwable == null) {
4480+
atomicInteger.incrementAndGet();
4481+
}
4482+
}
4483+
}).subscribe();
4484+
4485+
assertEquals(1, atomicInteger.get());
4486+
}
4487+
4488+
@Test
4489+
public void doOnEventError() {
4490+
final AtomicInteger atomicInteger = new AtomicInteger(0);
4491+
4492+
Completable.error(new RuntimeException()).doOnEvent(new Consumer<Throwable>() {
4493+
@Override
4494+
public void accept(final Throwable throwable) throws Exception {
4495+
if (throwable != null) {
4496+
atomicInteger.incrementAndGet();
4497+
}
4498+
}
4499+
}).subscribe();
4500+
4501+
assertEquals(1, atomicInteger.get());
4502+
}
44664503
}

src/test/java/io/reactivex/single/SingleTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -452,5 +452,42 @@ public void testToObservable() {
452452
ts.assertNoErrors();
453453
ts.assertComplete();
454454
}
455+
456+
@Test(expected = NullPointerException.class)
457+
public void doOnEventNullEvent() {
458+
Single.just(1).doOnEvent(null);
459+
}
460+
461+
@Test
462+
public void doOnEventComplete() {
463+
final AtomicInteger atomicInteger = new AtomicInteger(0);
464+
465+
Single.just(1).doOnEvent(new BiConsumer<Integer, Throwable>() {
466+
@Override
467+
public void accept(final Integer integer, final Throwable throwable) throws Exception {
468+
if (integer != null) {
469+
atomicInteger.incrementAndGet();
470+
}
471+
}
472+
}).subscribe();
473+
474+
assertEquals(1, atomicInteger.get());
475+
}
476+
477+
@Test
478+
public void doOnEventError() {
479+
final AtomicInteger atomicInteger = new AtomicInteger(0);
480+
481+
Single.error(new RuntimeException()).doOnEvent(new BiConsumer<Object, Throwable>() {
482+
@Override
483+
public void accept(final Object o, final Throwable throwable) throws Exception {
484+
if (throwable != null) {
485+
atomicInteger.incrementAndGet();
486+
}
487+
}
488+
}).subscribe();
489+
490+
assertEquals(1, atomicInteger.get());
491+
}
455492
}
456493

0 commit comments

Comments
 (0)