Skip to content

Commit c7c269c

Browse files
committed
doOnEach - reduce allocations, report to RxJavaHooks, use OnSubscribe (ReactiveX#4262)
1 parent f3495ac commit c7c269c

File tree

7 files changed

+315
-105
lines changed

7 files changed

+315
-105
lines changed

src/main/java/rx/Observable.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5507,9 +5507,9 @@ public final Observable<T> distinctUntilChanged(Func2<? super T, ? super T, Bool
55075507
public final Observable<T> doOnCompleted(final Action0 onCompleted) {
55085508
Action1<T> onNext = Actions.empty();
55095509
Action1<Throwable> onError = Actions.empty();
5510-
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onCompleted);
5510+
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);
55115511

5512-
return lift(new OperatorDoOnEach<T>(observer));
5512+
return create(new OnSubscribeDoOnEach<T>(this, observer));
55135513
}
55145514

55155515
/**
@@ -5531,8 +5531,7 @@ public final Observable<T> doOnCompleted(final Action0 onCompleted) {
55315531
*/
55325532
public final Observable<T> doOnEach(final Action1<Notification<? super T>> onNotification) {
55335533
Observer<T> observer = new ActionNotificationObserver<T>(onNotification);
5534-
5535-
return lift(new OperatorDoOnEach<T>(observer));
5534+
return create(new OnSubscribeDoOnEach<T>(this, observer));
55365535
}
55375536

55385537
/**
@@ -5559,7 +5558,7 @@ public final Observable<T> doOnEach(final Action1<Notification<? super T>> onNot
55595558
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
55605559
*/
55615560
public final Observable<T> doOnEach(Observer<? super T> observer) {
5562-
return lift(new OperatorDoOnEach<T>(observer));
5561+
return create(new OnSubscribeDoOnEach<T>(this, observer));
55635562
}
55645563

55655564
/**
@@ -5585,9 +5584,9 @@ public final Observable<T> doOnEach(Observer<? super T> observer) {
55855584
public final Observable<T> doOnError(final Action1<Throwable> onError) {
55865585
Action1<T> onNext = Actions.empty();
55875586
Action0 onCompleted = Actions.empty();
5588-
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onCompleted);
5587+
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);
55895588

5590-
return lift(new OperatorDoOnEach<T>(observer));
5589+
return create(new OnSubscribeDoOnEach<T>(this, observer));
55915590
}
55925591

55935592
/**
@@ -5610,9 +5609,9 @@ public final Observable<T> doOnError(final Action1<Throwable> onError) {
56105609
public final Observable<T> doOnNext(final Action1<? super T> onNext) {
56115610
Action1<Throwable> onError = Actions.empty();
56125611
Action0 onCompleted = Actions.empty();
5613-
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onCompleted);
5612+
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);
56145613

5615-
return lift(new OperatorDoOnEach<T>(observer));
5614+
return create(new OnSubscribeDoOnEach<T>(this, observer));
56165615
}
56175616

56185617
/**
@@ -5693,9 +5692,9 @@ public final Observable<T> doOnTerminate(final Action0 onTerminate) {
56935692
Action1<T> onNext = Actions.empty();
56945693
Action1<Throwable> onError = Actions.toAction1(onTerminate);
56955694

5696-
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onTerminate);
5695+
Observer<T> observer = new ActionObserver<T>(onNext, onError, onTerminate);
56975696

5698-
return lift(new OperatorDoOnEach<T>(observer));
5697+
return create(new OnSubscribeDoOnEach<T>(this, observer));
56995698
}
57005699

57015700
/**

src/main/java/rx/Single.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2413,7 +2413,7 @@ public void onNext(T t) {
24132413
}
24142414
};
24152415

2416-
return lift(new OperatorDoOnEach<T>(observer));
2416+
return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
24172417
}
24182418

24192419
/**
@@ -2449,7 +2449,7 @@ public void onNext(T t) {
24492449
}
24502450
};
24512451

2452-
return lift(new OperatorDoOnEach<T>(observer));
2452+
return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
24532453
}
24542454

24552455
/**
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.Arrays;
19+
20+
import rx.*;
21+
import rx.Observable.OnSubscribe;
22+
import rx.exceptions.*;
23+
import rx.plugins.RxJavaHooks;
24+
25+
/**
26+
* Calls specified actions for each notification.
27+
*
28+
* @param <T> the value type
29+
*/
30+
public class OnSubscribeDoOnEach<T> implements OnSubscribe<T> {
31+
private final Observer<? super T> doOnEachObserver;
32+
private final Observable<T> source;
33+
34+
public OnSubscribeDoOnEach(Observable<T> source, Observer<? super T> doOnEachObserver) {
35+
this.source = source;
36+
this.doOnEachObserver = doOnEachObserver;
37+
}
38+
39+
@Override
40+
public void call(final Subscriber<? super T> subscriber) {
41+
source.unsafeSubscribe(new DoOnEachSubscriber<T>(subscriber, doOnEachObserver));
42+
}
43+
44+
private static final class DoOnEachSubscriber<T> extends Subscriber<T> {
45+
46+
private final Subscriber<? super T> subscriber;
47+
private final Observer<? super T> doOnEachObserver;
48+
49+
private boolean done;
50+
51+
DoOnEachSubscriber(Subscriber<? super T> subscriber, Observer<? super T> doOnEachObserver) {
52+
super(subscriber);
53+
this.subscriber = subscriber;
54+
this.doOnEachObserver = doOnEachObserver;
55+
}
56+
57+
@Override
58+
public void onCompleted() {
59+
if (done) {
60+
return;
61+
}
62+
try {
63+
doOnEachObserver.onCompleted();
64+
} catch (Throwable e) {
65+
Exceptions.throwOrReport(e, this);
66+
return;
67+
}
68+
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
69+
done = true;
70+
subscriber.onCompleted();
71+
}
72+
73+
@Override
74+
public void onError(Throwable e) {
75+
if (done) {
76+
RxJavaHooks.onError(e);
77+
return;
78+
}
79+
done = true;
80+
try {
81+
doOnEachObserver.onError(e);
82+
} catch (Throwable e2) {
83+
Exceptions.throwIfFatal(e2);
84+
subscriber.onError(new CompositeException(Arrays.asList(e, e2)));
85+
return;
86+
}
87+
subscriber.onError(e);
88+
}
89+
90+
@Override
91+
public void onNext(T value) {
92+
if (done) {
93+
return;
94+
}
95+
try {
96+
doOnEachObserver.onNext(value);
97+
} catch (Throwable e) {
98+
Exceptions.throwOrReport(e, this, value);
99+
return;
100+
}
101+
subscriber.onNext(value);
102+
}
103+
}
104+
}

src/main/java/rx/internal/operators/OperatorDoOnEach.java

Lines changed: 0 additions & 90 deletions
This file was deleted.
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import rx.Observer;
19+
import rx.functions.*;
20+
21+
/**
22+
* An Observer that forwards the onXXX method calls to callbacks.
23+
* @param <T> the value type
24+
*/
25+
public final class ActionObserver<T> implements Observer<T> {
26+
27+
final Action1<? super T> onNext;
28+
final Action1<Throwable> onError;
29+
final Action0 onCompleted;
30+
31+
public ActionObserver(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) {
32+
this.onNext = onNext;
33+
this.onError = onError;
34+
this.onCompleted = onCompleted;
35+
}
36+
37+
@Override
38+
public void onNext(T t) {
39+
onNext.call(t);
40+
}
41+
42+
@Override
43+
public void onError(Throwable e) {
44+
onError.call(e);
45+
}
46+
47+
@Override
48+
public void onCompleted() {
49+
onCompleted.call();
50+
}
51+
}

0 commit comments

Comments
 (0)