Skip to content

Commit d36b626

Browse files
committed
Merge pull request #3766 from artem-zinnatullin/single-on-error-resume-next-with-function
1.x: Add Single.onErrorResumeNext(Func)
2 parents 4fd0c60 + 0495044 commit d36b626

File tree

5 files changed

+184
-47
lines changed

5 files changed

+184
-47
lines changed

src/main/java/rx/Single.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1430,9 +1430,46 @@ public final Single<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunctio
14301430
* @param resumeSingleInCaseOfError a Single that will take control if source Single encounters an error.
14311431
* @return the original Single, with appropriately modified behavior.
14321432
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
1433+
* @Experimental The behavior of this can change at any time.
1434+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
14331435
*/
1436+
@Experimental
14341437
public final Single<T> onErrorResumeNext(Single<? extends T> resumeSingleInCaseOfError) {
1435-
return new Single<T>(new SingleOperatorOnErrorResumeNextViaSingle<T>(this, resumeSingleInCaseOfError));
1438+
return new Single<T>(SingleOperatorOnErrorResumeNext.withOther(this, resumeSingleInCaseOfError));
1439+
}
1440+
1441+
/**
1442+
* Instructs a Single to pass control to another Single rather than invoking
1443+
* {@link Observer#onError(Throwable)} if it encounters an error.
1444+
* <p/>
1445+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorResumeNext.png" alt="">
1446+
* <p/>
1447+
* By default, when a Single encounters an error that prevents it from emitting the expected item to
1448+
* its {@link Observer}, the Single invokes its Observer's {@code onError} method, and then quits
1449+
* without invoking any more of its Observer's methods. The {@code onErrorResumeNext} method changes this
1450+
* behavior. If you pass a function that will return another Single ({@code resumeFunctionInCaseOfError}) to an Single's
1451+
* {@code onErrorResumeNext} method, if the original Single encounters an error, instead of invoking its
1452+
* Observer's {@code onError} method, it will instead relinquish control to {@code resumeSingleInCaseOfError} which
1453+
* will invoke the Observer's {@link Observer#onNext onNext} method if it is able to do so. In such a case,
1454+
* because no Single necessarily invokes {@code onError}, the Observer may never know that an error
1455+
* happened.
1456+
* <p/>
1457+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
1458+
* encountered.
1459+
* <dl>
1460+
* <dt><b>Scheduler:</b></dt>
1461+
* <dd>{@code onErrorResumeNext} does not operate by default on a particular {@link Scheduler}.</dd>
1462+
* </dl>
1463+
*
1464+
* @param resumeFunctionInCaseOfError a function that returns a Single that will take control if source Single encounters an error.
1465+
* @return the original Single, with appropriately modified behavior.
1466+
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
1467+
* @Experimental The behavior of this can change at any time.
1468+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
1469+
*/
1470+
@Experimental
1471+
public final Single<T> onErrorResumeNext(final Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
1472+
return new Single<T>(SingleOperatorOnErrorResumeNext.withFunction(this, resumeFunctionInCaseOfError));
14361473
}
14371474

14381475
/**

src/main/java/rx/exceptions/Exceptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.*;
1919

2020
import rx.Observer;
21+
import rx.SingleSubscriber;
2122
import rx.annotations.Experimental;
2223

2324
/**
@@ -188,6 +189,7 @@ public static void throwOrReport(Throwable t, Observer<?> o, Object value) {
188189
Exceptions.throwIfFatal(t);
189190
o.onError(OnErrorThrowable.addValueAsLastCause(t, value));
190191
}
192+
191193
/**
192194
* Forwards a fatal exception or reports it to the given Observer.
193195
* @param t the exception
@@ -199,4 +201,17 @@ public static void throwOrReport(Throwable t, Observer<?> o) {
199201
Exceptions.throwIfFatal(t);
200202
o.onError(t);
201203
}
204+
205+
/**
206+
* Forwards a fatal exception or reports it to the given Observer.
207+
*
208+
* @param throwable the exception.
209+
* @param subscriber the subscriber to report to.
210+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number).
211+
*/
212+
@Experimental
213+
public static void throwOrReport(Throwable throwable, SingleSubscriber<?> subscriber) {
214+
Exceptions.throwIfFatal(throwable);
215+
subscriber.onError(throwable);
216+
}
202217
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package rx.internal.operators;
2+
3+
import rx.Single;
4+
import rx.SingleSubscriber;
5+
import rx.exceptions.Exceptions;
6+
import rx.functions.Func1;
7+
import rx.plugins.RxJavaPlugins;
8+
9+
public class SingleOperatorOnErrorResumeNext<T> implements Single.OnSubscribe<T> {
10+
11+
private final Single<? extends T> originalSingle;
12+
private final Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError;
13+
14+
private SingleOperatorOnErrorResumeNext(Single<? extends T> originalSingle, Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
15+
if (originalSingle == null) {
16+
throw new NullPointerException("originalSingle must not be null");
17+
}
18+
19+
if (resumeFunctionInCaseOfError == null) {
20+
throw new NullPointerException("resumeFunctionInCaseOfError must not be null");
21+
}
22+
23+
this.originalSingle = originalSingle;
24+
this.resumeFunctionInCaseOfError = resumeFunctionInCaseOfError;
25+
}
26+
27+
public static <T> SingleOperatorOnErrorResumeNext<T> withFunction(Single<? extends T> originalSingle, Func1<Throwable, ? extends Single<? extends T>> resumeFunctionInCaseOfError) {
28+
return new SingleOperatorOnErrorResumeNext<T>(originalSingle, resumeFunctionInCaseOfError);
29+
}
30+
31+
public static <T> SingleOperatorOnErrorResumeNext<T> withOther(Single<? extends T> originalSingle, final Single<? extends T> resumeSingleInCaseOfError) {
32+
if (resumeSingleInCaseOfError == null) {
33+
throw new NullPointerException("resumeSingleInCaseOfError must not be null");
34+
}
35+
36+
return new SingleOperatorOnErrorResumeNext<T>(originalSingle, new Func1<Throwable, Single<? extends T>>() {
37+
@Override
38+
public Single<? extends T> call(Throwable throwable) {
39+
return resumeSingleInCaseOfError;
40+
}
41+
});
42+
}
43+
44+
@Override
45+
public void call(final SingleSubscriber<? super T> child) {
46+
final SingleSubscriber<? super T> parent = new SingleSubscriber<T>() {
47+
@Override
48+
public void onSuccess(T value) {
49+
child.onSuccess(value);
50+
}
51+
52+
@Override
53+
public void onError(Throwable error) {
54+
try {
55+
resumeFunctionInCaseOfError.call(error).subscribe(child);
56+
} catch (Throwable innerError) {
57+
Exceptions.throwOrReport(innerError, child);
58+
}
59+
}
60+
};
61+
62+
child.add(parent);
63+
originalSingle.subscribe(parent);
64+
}
65+
}

src/main/java/rx/internal/operators/SingleOperatorOnErrorResumeNextViaSingle.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

src/test/java/rx/SingleTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1220,13 +1220,78 @@ public void onErrorResumeNextViaSingleShouldPreventNullSingle() {
12201220
try {
12211221
Single
12221222
.just("value")
1223-
.onErrorResumeNext(null);
1223+
.onErrorResumeNext((Single<String>) null);
12241224
fail();
12251225
} catch (NullPointerException expected) {
12261226
assertEquals("resumeSingleInCaseOfError must not be null", expected.getMessage());
12271227
}
12281228
}
12291229

1230+
@Test
1231+
public void onErrorResumeNextViaFunctionShouldNotInterruptSuccesfulSingle() {
1232+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
1233+
1234+
Single
1235+
.just("success")
1236+
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
1237+
@Override
1238+
public Single<? extends String> call(Throwable throwable) {
1239+
return Single.just("fail");
1240+
}
1241+
})
1242+
.subscribe(testSubscriber);
1243+
1244+
testSubscriber.assertValue("success");
1245+
}
1246+
1247+
@Test
1248+
public void onErrorResumeNextViaFunctionShouldResumeWithPassedSingleInCaseOfError() {
1249+
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
1250+
1251+
Single
1252+
.<String> error(new RuntimeException("test exception"))
1253+
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
1254+
@Override
1255+
public Single<? extends String> call(Throwable throwable) {
1256+
return Single.just("fallback");
1257+
}
1258+
})
1259+
.subscribe(testSubscriber);
1260+
1261+
testSubscriber.assertValue("fallback");
1262+
}
1263+
1264+
@Test
1265+
public void onErrorResumeNextViaFunctionShouldPreventNullFunction() {
1266+
try {
1267+
Single
1268+
.just("value")
1269+
.onErrorResumeNext((Func1<Throwable, ? extends Single<? extends String>>) null);
1270+
fail();
1271+
} catch (NullPointerException expected) {
1272+
assertEquals("resumeFunctionInCaseOfError must not be null", expected.getMessage());
1273+
}
1274+
}
1275+
1276+
@Test
1277+
public void onErrorResumeNextViaFunctionShouldFailIfFunctionReturnsNull() {
1278+
try {
1279+
Single
1280+
.error(new TestException())
1281+
.onErrorResumeNext(new Func1<Throwable, Single<? extends String>>() {
1282+
@Override
1283+
public Single<? extends String> call(Throwable throwable) {
1284+
return null;
1285+
}
1286+
})
1287+
.subscribe();
1288+
1289+
fail();
1290+
} catch (OnErrorNotImplementedException expected) {
1291+
assertTrue(expected.getCause() instanceof NullPointerException);
1292+
}
1293+
}
1294+
12301295
@Test(expected = NullPointerException.class)
12311296
public void iterableToArrayShouldThrowNullPointerExceptionIfIterableNull() {
12321297
Single.iterableToArray(null);

0 commit comments

Comments
 (0)