Skip to content

Commit b6c13f7

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Completable enhance doOnEvent to reduce allocations (#4486)
1 parent eddc153 commit b6c13f7

File tree

3 files changed

+104
-17
lines changed

3 files changed

+104
-17
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -948,7 +948,7 @@ public final Completable doOnComplete(Action onComplete) {
948948
onComplete, Functions.EMPTY_ACTION,
949949
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
950950
}
951-
951+
952952
/**
953953
* Returns a Completable which calls the given onDispose callback if the child subscriber cancels
954954
* the subscription.
@@ -966,7 +966,7 @@ public final Completable doOnDispose(Action onDispose) {
966966
Functions.EMPTY_ACTION, Functions.EMPTY_ACTION,
967967
Functions.EMPTY_ACTION, onDispose);
968968
}
969-
969+
970970
/**
971971
* Returns a Completable which calls the given onError callback if this Completable emits an error.
972972
* <dl>
@@ -998,17 +998,7 @@ public final Completable doOnError(Consumer<? super Throwable> onError) {
998998
@SchedulerSupport(SchedulerSupport.NONE)
999999
public final Completable doOnEvent(final Consumer<? super Throwable> onEvent) {
10001000
ObjectHelper.requireNonNull(onEvent, "onEvent is null");
1001-
return doOnLifecycle(Functions.emptyConsumer(), new Consumer<Throwable>() {
1002-
@Override
1003-
public void accept(final Throwable throwable) throws Exception {
1004-
onEvent.accept(throwable);
1005-
}
1006-
}, new Action() {
1007-
@Override
1008-
public void run() throws Exception {
1009-
onEvent.accept(null);
1010-
}
1011-
}, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
1001+
return RxJavaPlugins.onAssembly(new CompletableDoOnEvent(this, onEvent));
10121002
}
10131003

10141004
/**
@@ -1027,9 +1017,9 @@ public void run() throws Exception {
10271017
*/
10281018
@SchedulerSupport(SchedulerSupport.NONE)
10291019
private Completable doOnLifecycle(
1030-
final Consumer<? super Disposable> onSubscribe,
1031-
final Consumer<? super Throwable> onError,
1032-
final Action onComplete,
1020+
final Consumer<? super Disposable> onSubscribe,
1021+
final Consumer<? super Throwable> onError,
1022+
final Action onComplete,
10331023
final Action onTerminate,
10341024
final Action onAfterTerminate,
10351025
final Action onDispose) {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.CompletableObserver;
18+
import io.reactivex.CompletableSource;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.CompositeException;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.Consumer;
23+
24+
public final class CompletableDoOnEvent extends Completable {
25+
final CompletableSource source;
26+
final Consumer<? super Throwable> onEvent;
27+
28+
public CompletableDoOnEvent(final CompletableSource source, final Consumer<? super Throwable> onEvent) {
29+
this.source = source;
30+
this.onEvent = onEvent;
31+
}
32+
33+
@Override
34+
protected void subscribeActual(final CompletableObserver s) {
35+
source.subscribe(new CompletableObserver() {
36+
@Override
37+
public void onComplete() {
38+
try {
39+
onEvent.accept(null);
40+
} catch (Throwable e) {
41+
Exceptions.throwIfFatal(e);
42+
s.onError(e);
43+
return;
44+
}
45+
46+
s.onComplete();
47+
}
48+
49+
@Override
50+
public void onError(Throwable e) {
51+
try {
52+
onEvent.accept(e);
53+
} catch (Throwable ex) {
54+
Exceptions.throwIfFatal(ex);
55+
e = new CompositeException(ex, e);
56+
}
57+
58+
s.onError(e);
59+
}
60+
61+
@Override
62+
public void onSubscribe(final Disposable d) {
63+
s.onSubscribe(d);
64+
}
65+
});
66+
}
67+
}

src/test/java/io/reactivex/completable/CompletableTest.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4465,7 +4465,37 @@ public void run() {
44654465
}
44664466

44674467
@Test(expected = NullPointerException.class)
4468-
public void doOnEventNullEvent() {
4468+
public void doOnErrorNullValue() {
4469+
Completable.complete().doOnError(null);
4470+
}
4471+
4472+
@Test(expected = NullPointerException.class)
4473+
public void doOnSubscribeNullValue() {
4474+
Completable.complete().doOnSubscribe(null);
4475+
}
4476+
4477+
@Test(expected = NullPointerException.class)
4478+
public void doAfterTerminateNullValue() {
4479+
Completable.complete().doAfterTerminate(null);
4480+
}
4481+
4482+
@Test(expected = NullPointerException.class)
4483+
public void doOnTerminateNullValue() {
4484+
Completable.complete().doOnTerminate(null);
4485+
}
4486+
4487+
@Test(expected = NullPointerException.class)
4488+
public void doOnCompleteNullValue() {
4489+
Completable.complete().doOnComplete(null);
4490+
}
4491+
4492+
@Test(expected = NullPointerException.class)
4493+
public void doOnDisposeNullValue() {
4494+
Completable.complete().doOnDispose(null);
4495+
}
4496+
4497+
@Test(expected = NullPointerException.class)
4498+
public void doOnEventNullValue() {
44694499
Completable.complete().doOnEvent(null);
44704500
}
44714501

0 commit comments

Comments
 (0)