Skip to content

doOnEach - reduce allocations, report to RxJavaHooks #4262

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 31, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5507,9 +5507,9 @@ public final Observable<T> distinctUntilChanged(Func2<? super T, ? super T, Bool
public final Observable<T> doOnCompleted(final Action0 onCompleted) {
Action1<T> onNext = Actions.empty();
Action1<Throwable> onError = Actions.empty();
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onCompleted);
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);

return lift(new OperatorDoOnEach<T>(observer));
return create(new OnSubscribeDoOnEach<T>(this, observer));
}

/**
Expand All @@ -5531,8 +5531,7 @@ public final Observable<T> doOnCompleted(final Action0 onCompleted) {
*/
public final Observable<T> doOnEach(final Action1<Notification<? super T>> onNotification) {
Observer<T> observer = new ActionNotificationObserver<T>(onNotification);

return lift(new OperatorDoOnEach<T>(observer));
return create(new OnSubscribeDoOnEach<T>(this, observer));
}

/**
Expand All @@ -5559,7 +5558,7 @@ public final Observable<T> doOnEach(final Action1<Notification<? super T>> onNot
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
public final Observable<T> doOnEach(Observer<? super T> observer) {
return lift(new OperatorDoOnEach<T>(observer));
return create(new OnSubscribeDoOnEach<T>(this, observer));
}

/**
Expand All @@ -5585,9 +5584,9 @@ public final Observable<T> doOnEach(Observer<? super T> observer) {
public final Observable<T> doOnError(final Action1<Throwable> onError) {
Action1<T> onNext = Actions.empty();
Action0 onCompleted = Actions.empty();
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onCompleted);
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);

return lift(new OperatorDoOnEach<T>(observer));
return create(new OnSubscribeDoOnEach<T>(this, observer));
}

/**
Expand All @@ -5610,9 +5609,9 @@ public final Observable<T> doOnError(final Action1<Throwable> onError) {
public final Observable<T> doOnNext(final Action1<? super T> onNext) {
Action1<Throwable> onError = Actions.empty();
Action0 onCompleted = Actions.empty();
Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onCompleted);
Observer<T> observer = new ActionObserver<T>(onNext, onError, onCompleted);

return lift(new OperatorDoOnEach<T>(observer));
return create(new OnSubscribeDoOnEach<T>(this, observer));
}

/**
Expand Down Expand Up @@ -5693,9 +5692,9 @@ public final Observable<T> doOnTerminate(final Action0 onTerminate) {
Action1<T> onNext = Actions.empty();
Action1<Throwable> onError = Actions.toAction1(onTerminate);

Observer<T> observer = new ActionSubscriber<T>(onNext, onError, onTerminate);
Observer<T> observer = new ActionObserver<T>(onNext, onError, onTerminate);

return lift(new OperatorDoOnEach<T>(observer));
return create(new OnSubscribeDoOnEach<T>(this, observer));
}

/**
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2413,7 +2413,7 @@ public void onNext(T t) {
}
};

return lift(new OperatorDoOnEach<T>(observer));
return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
}

/**
Expand Down Expand Up @@ -2449,7 +2449,7 @@ public void onNext(T t) {
}
};

return lift(new OperatorDoOnEach<T>(observer));
return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
}

/**
Expand Down
104 changes: 104 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeDoOnEach.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.Arrays;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.*;
import rx.plugins.RxJavaHooks;

/**
* Calls specified actions for each notification.
*
* @param <T> the value type
*/
public class OnSubscribeDoOnEach<T> implements OnSubscribe<T> {
private final Observer<? super T> doOnEachObserver;
private final Observable<T> source;

public OnSubscribeDoOnEach(Observable<T> source, Observer<? super T> doOnEachObserver) {
this.source = source;
this.doOnEachObserver = doOnEachObserver;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
source.unsafeSubscribe(new DoOnEachSubscriber<T>(subscriber, doOnEachObserver));
}

private static final class DoOnEachSubscriber<T> extends Subscriber<T> {

private final Subscriber<? super T> subscriber;
private final Observer<? super T> doOnEachObserver;

private boolean done;

DoOnEachSubscriber(Subscriber<? super T> subscriber, Observer<? super T> doOnEachObserver) {
super(subscriber);
this.subscriber = subscriber;
this.doOnEachObserver = doOnEachObserver;
}

@Override
public void onCompleted() {
if (done) {
return;
}
try {
doOnEachObserver.onCompleted();
} catch (Throwable e) {
Exceptions.throwOrReport(e, this);
return;
}
// Set `done` here so that the error in `doOnEachObserver.onCompleted()` can be noticed by observer
done = true;
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
try {
doOnEachObserver.onError(e);
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
subscriber.onError(new CompositeException(Arrays.asList(e, e2)));
return;
}
subscriber.onError(e);
}

@Override
public void onNext(T value) {
if (done) {
return;
}
try {
doOnEachObserver.onNext(value);
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, value);
return;
}
subscriber.onNext(value);
}
}
}
90 changes: 0 additions & 90 deletions src/main/java/rx/internal/operators/OperatorDoOnEach.java

This file was deleted.

51 changes: 51 additions & 0 deletions src/main/java/rx/internal/util/ActionObserver.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.util;

import rx.Observer;
import rx.functions.*;

/**
* An Observer that forwards the onXXX method calls to callbacks.
* @param <T> the value type
*/
public final class ActionObserver<T> implements Observer<T> {

final Action1<? super T> onNext;
final Action1<Throwable> onError;
final Action0 onCompleted;

public ActionObserver(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted) {
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}

@Override
public void onNext(T t) {
onNext.call(t);
}

@Override
public void onError(Throwable e) {
onError.call(e);
}

@Override
public void onCompleted() {
onCompleted.call();
}
}
Loading