Skip to content

Commit 2944606

Browse files
Merge pull request #1367 from zsxwing/swallow-error
Fix the bug that 'flatMap' swallows OnErrorNotImplementedException
2 parents 1843841 + 84c72a5 commit 2944606

File tree

3 files changed

+15
-0
lines changed

3 files changed

+15
-0
lines changed

rxjava-core/src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.Observable;
2121
import rx.Observable.Operator;
2222
import rx.Subscriber;
23+
import rx.exceptions.Exceptions;
2324
import rx.observers.SerializedSubscriber;
2425
import rx.subscriptions.CompositeSubscription;
2526

@@ -111,6 +112,7 @@ public void onNext(T t) {
111112

112113
@Override
113114
public void onError(Throwable e) {
115+
Exceptions.throwIfFatal(e);
114116
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
115117
parent.onError(e);
116118
}

rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.observers;
1717

1818
import rx.Observer;
19+
import rx.exceptions.Exceptions;
1920

2021
/**
2122
* Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
@@ -100,6 +101,7 @@ public void onCompleted() {
100101

101102
@Override
102103
public void onError(final Throwable e) {
104+
Exceptions.throwIfFatal(e);
103105
FastList list;
104106
synchronized (this) {
105107
if (terminated) {

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.mockito.MockitoAnnotations;
4747

4848
import rx.Observable.OnSubscribe;
49+
import rx.exceptions.OnErrorNotImplementedException;
4950
import rx.functions.Action1;
5051
import rx.functions.Action2;
5152
import rx.functions.Func1;
@@ -1016,4 +1017,14 @@ public void testAmbWith() {
10161017
ts.assertReceivedOnNext(Arrays.asList(1));
10171018
}
10181019

1020+
@Test(expected = OnErrorNotImplementedException.class)
1021+
public void testSubscribeWithoutOnError() {
1022+
Observable<String> o = Observable.from("a", "b").flatMap(new Func1<String, Observable<String>>() {
1023+
@Override
1024+
public Observable<String> call(String s) {
1025+
return Observable.error(new Exception("test"));
1026+
}
1027+
});
1028+
o.subscribe();
1029+
}
10191030
}

0 commit comments

Comments
 (0)