Skip to content

1.x: Single add doOnEach #4461

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2406,27 +2406,54 @@ public final <T2, R> Single<R> zipWith(Single<? extends T2> other, Func2<? super
* the action to invoke if the source {@link Single} calls {@code onError}
* @return the source {@link Single} with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Single<T> doOnError(final Action1<Throwable> onError) {
Observer<T> observer = new Observer<T>() {
if (onError == null) {
throw new IllegalArgumentException("onError is null");
}

return Single.create(new SingleDoOnEvent<T>(this, Actions.empty(), new Action1<Throwable>() {
@Override
public void onCompleted() {
// deliberately ignored
public void call(final Throwable throwable) {
onError.call(throwable);
}
}));
}

/**
* Modifies the source {@link Single} so that it invokes an action when it calls {@code onSuccess} or {@code onError}.
* <p>
* <img width="640" height="310" src="https://raw.githubusercontent.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnEach.png" alt="">
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we might need a new image here since there's no onNext

* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnEach} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onNotification
* the action to invoke when the source {@link Single} calls {@code onSuccess} or {@code onError}.
* @return the source {@link Single} with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

javadoc?

public final Single<T> doOnEach(final Action1<Notification<? extends T>> onNotification) {
if (onNotification == null) {
throw new IllegalArgumentException("onNotification is null");
}

return Single.create(new SingleDoOnEvent<T>(this, new Action1<T>() {
@Override
public void onError(Throwable e) {
onError.call(e);
public void call(final T t) {
onNotification.call(Notification.<T>createOnNext(t));
}

}, new Action1<Throwable>() {
@Override
public void onNext(T t) {
// deliberately ignored
public void call(final Throwable throwable) {
onNotification.call(Notification.<T>createOnError(throwable));
}
};

return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
}));
}

/**
Expand All @@ -2442,27 +2469,25 @@ public void onNext(T t) {
* the action to invoke when the source {@link Single} calls {@code onSuccess}
* @return the source {@link Single} with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final Single<T> doOnSuccess(final Action1<? super T> onSuccess) {
Observer<T> observer = new Observer<T>() {
@Override
public void onCompleted() {
// deliberately ignored
}
if (onSuccess == null) {
throw new IllegalArgumentException("onSuccess is null");
}

return Single.create(new SingleDoOnEvent<T>(this, new Action1<T>() {
@Override
public void onError(Throwable e) {
// deliberately ignored
public void call(final T t) {
onSuccess.call(t);
}

}, new Action1<Throwable>() {
@Override
public void onNext(T t) {
onSuccess.call(t);
public void call(final Throwable throwable) {
// Do nothing.
}
};

return Observable.create(new OnSubscribeDoOnEach<T>(this.toObservable(), observer)).toSingle();
}));
}

/**
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/rx/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,20 @@ public static void throwOrReport(Throwable t, Observer<?> o, Object value) {
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
}

/**
* Forwards a fatal exception or reports it along with the value
* caused it to the given SingleSubscriber.
* @param t the exception
* @param o the observer to report to
* @param value the value that caused the exception
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static void throwOrReport(Throwable t, SingleSubscriber<?> o, Object value) {
Exceptions.throwIfFatal(t);
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
}

/**
* Forwards a fatal exception or reports it to the given Observer.
* @param t the exception
Expand Down
79 changes: 79 additions & 0 deletions src/main/java/rx/internal/operators/SingleDoOnEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx.internal.operators;

import rx.Single;
import rx.SingleSubscriber;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.functions.Action1;

public final class SingleDoOnEvent<T> implements Single.OnSubscribe<T> {
final Single<T> source;
final Action1<? super T> onSuccess;
final Action1<Throwable> onError;

public SingleDoOnEvent(Single<T> source, Action1<? super T> onSuccess, Action1<Throwable> onError) {
this.source = source;
this.onSuccess = onSuccess;
this.onError = onError;
}

@Override
public void call(SingleSubscriber<? super T> actual) {
SingleDoOnEventSubscriber<T> parent = new SingleDoOnEventSubscriber<T>(actual, onSuccess, onError);
actual.add(parent);
source.subscribe(parent);
}

static final class SingleDoOnEventSubscriber<T> extends SingleSubscriber<T> {
final SingleSubscriber<? super T> actual;
final Action1<? super T> onSuccess;
final Action1<Throwable> onError;

SingleDoOnEventSubscriber(SingleSubscriber<? super T> actual, Action1<? super T> onSuccess, Action1<Throwable> onError) {
this.actual = actual;
this.onSuccess = onSuccess;
this.onError = onError;
}

@Override
public void onSuccess(T value) {
try {
onSuccess.call(value);
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, value);
return;
}

actual.onSuccess(value);
}

@Override
public void onError(Throwable error) {
try {
onError.call(error);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(new CompositeException(error, e));
return;
}

actual.onError(error);
}
}
}
50 changes: 50 additions & 0 deletions src/test/java/rx/SingleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -869,6 +869,11 @@ public void toCompletableError() {
testSubscriber.assertNotCompleted();
}

@Test(expected = IllegalArgumentException.class)
public void doOnErrorNull() {
Single.just(1).doOnError(null);
}

@Test
public void doOnErrorShouldNotCallActionIfNoErrorHasOccurred() {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -974,6 +979,11 @@ public void shouldPassErrorFromCallable() throws Exception {
verify(callable).call();
}

@Test(expected = IllegalArgumentException.class)
public void doOnSuccessNull() {
Single.just(1).doOnSuccess(null);
}

@Test
public void doOnSuccessShouldInvokeAction() {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -2086,4 +2096,44 @@ public Object call(Single<String> single) {
assertSame(expectedResult, actualResult);
assertSame(s, singleRef.get());
}

@Test(expected = IllegalArgumentException.class)
public void doOnEachNull() {
Single.just(1).doOnEach(null);
}

@Test
public void doOnEachError() {
final AtomicInteger atomicInteger = new AtomicInteger(0);
Single.error(new RuntimeException()).doOnEach(new Action1<Notification<?>>() {
@Override
public void call(final Notification<?> notification) {
if (notification.isOnError()) {
atomicInteger.incrementAndGet();
}
}
}).subscribe(Actions.empty(), new Action1<Throwable>() {
@Override
public void call(final Throwable throwable) {
// Do nothing this is expected.
}
});

assertEquals(1, atomicInteger.get());
}

@Test
public void doOnEachSuccess() {
final AtomicInteger atomicInteger = new AtomicInteger(0);
Single.just(1).doOnEach(new Action1<Notification<? extends Integer>>() {
@Override
public void call(final Notification<? extends Integer> notification) {
if (notification.isOnNext()) {
atomicInteger.getAndAdd(notification.getValue());
}
}
}).subscribe();

assertEquals(1, atomicInteger.get());
}
}