Skip to content

Commit 4165091

Browse files
committed
1.x: OperatorMapPair should unsubscribe on crash eagerly (#3896)
1 parent 123c5f9 commit 4165091

File tree

2 files changed

+117
-28
lines changed

2 files changed

+117
-28
lines changed

src/main/java/rx/internal/operators/OperatorMapPair.java

Lines changed: 78 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
18+
import rx.*;
1919
import rx.Observable.Operator;
2020
import rx.exceptions.*;
21-
import rx.Subscriber;
22-
import rx.functions.Func1;
23-
import rx.functions.Func2;
21+
import rx.functions.*;
22+
import rx.internal.util.RxJavaPluginUtils;
2423

2524
/**
2625
* An {@link Operator} that pairs up items emitted by a source {@link Observable} with the sequence of items
@@ -45,6 +44,7 @@ public final class OperatorMapPair<T, U, R> implements Operator<Observable<? ext
4544
*/
4645
public static <T, U> Func1<T, Observable<U>> convertSelector(final Func1<? super T, ? extends Iterable<? extends U>> selector) {
4746
return new Func1<T, Observable<U>>() {
47+
@SuppressWarnings("cast")
4848
@Override
4949
public Observable<U> call(T t1) {
5050
return (Observable<U>)Observable.from(selector.call(t1));
@@ -62,34 +62,84 @@ public OperatorMapPair(final Func1<? super T, ? extends Observable<? extends U>>
6262

6363
@Override
6464
public Subscriber<? super T> call(final Subscriber<? super Observable<? extends R>> o) {
65-
return new Subscriber<T>(o) {
65+
MapPairSubscriber<T, U, R> parent = new MapPairSubscriber<T, U, R>(o, collectionSelector, resultSelector);
66+
o.add(parent);
67+
return parent;
68+
}
69+
70+
static final class MapPairSubscriber<T, U, R> extends Subscriber<T> {
71+
72+
final Subscriber<? super Observable<? extends R>> actual;
73+
74+
final Func1<? super T, ? extends Observable<? extends U>> collectionSelector;
75+
final Func2<? super T, ? super U, ? extends R> resultSelector;
6676

67-
@Override
68-
public void onCompleted() {
69-
o.onCompleted();
77+
boolean done;
78+
79+
public MapPairSubscriber(Subscriber<? super Observable<? extends R>> actual,
80+
Func1<? super T, ? extends Observable<? extends U>> collectionSelector,
81+
Func2<? super T, ? super U, ? extends R> resultSelector) {
82+
this.actual = actual;
83+
this.collectionSelector = collectionSelector;
84+
this.resultSelector = resultSelector;
85+
}
86+
87+
@Override
88+
public void onNext(T outer) {
89+
90+
Observable<? extends U> intermediate;
91+
92+
try {
93+
intermediate = collectionSelector.call(outer);
94+
} catch (Throwable ex) {
95+
Exceptions.throwIfFatal(ex);
96+
unsubscribe();
97+
onError(OnErrorThrowable.addValueAsLastCause(ex, outer));
98+
return;
7099
}
71-
72-
@Override
73-
public void onError(Throwable e) {
74-
o.onError(e);
100+
101+
actual.onNext(intermediate.map(new OuterInnerMapper<T, U, R>(outer, resultSelector)));
102+
}
103+
104+
@Override
105+
public void onError(Throwable e) {
106+
if (done) {
107+
RxJavaPluginUtils.handleException(e);
108+
return;
75109
}
76-
77-
@Override
78-
public void onNext(final T outer) {
79-
try {
80-
o.onNext(collectionSelector.call(outer).map(new Func1<U, R>() {
81-
82-
@Override
83-
public R call(U inner) {
84-
return resultSelector.call(outer, inner);
85-
}
86-
}));
87-
} catch (Throwable e) {
88-
Exceptions.throwOrReport(e, o, outer);
89-
}
110+
done = true;
111+
112+
actual.onError(e);
113+
}
114+
115+
116+
@Override
117+
public void onCompleted() {
118+
if (done) {
119+
return;
90120
}
91-
92-
};
121+
actual.onCompleted();
122+
}
123+
124+
@Override
125+
public void setProducer(Producer p) {
126+
actual.setProducer(p);
127+
}
93128
}
94129

130+
static final class OuterInnerMapper<T, U, R> implements Func1<U, R> {
131+
final T outer;
132+
final Func2<? super T, ? super U, ? extends R> resultSelector;
133+
134+
public OuterInnerMapper(T outer, Func2<? super T, ? super U, ? extends R> resultSelector) {
135+
this.outer = outer;
136+
this.resultSelector = resultSelector;
137+
}
138+
139+
@Override
140+
public R call(U inner) {
141+
return resultSelector.call(outer, inner);
142+
}
143+
144+
}
95145
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package rx.internal.operators;
2+
3+
import org.junit.*;
4+
5+
import rx.Observable;
6+
import rx.exceptions.TestException;
7+
import rx.functions.*;
8+
import rx.observers.TestSubscriber;
9+
import rx.subjects.PublishSubject;
10+
11+
public class OperatorMapPairTest {
12+
@Test
13+
public void castCrashUnsubscribes() {
14+
15+
PublishSubject<Integer> ps = PublishSubject.create();
16+
17+
TestSubscriber<Integer> ts = TestSubscriber.create();
18+
19+
ps.flatMap(new Func1<Integer, Observable<Integer>>() {
20+
@Override
21+
public Observable<Integer> call(Integer t) {
22+
throw new TestException();
23+
}
24+
}, new Func2<Integer, Integer, Integer>() {
25+
@Override
26+
public Integer call(Integer t1, Integer t2) {
27+
return t1;
28+
}
29+
}).unsafeSubscribe(ts);
30+
31+
Assert.assertTrue("Not subscribed?", ps.hasObservers());
32+
33+
ps.onNext(1);
34+
35+
Assert.assertFalse("Subscribed?", ps.hasObservers());
36+
37+
ts.assertError(TestException.class);
38+
}
39+
}

0 commit comments

Comments
 (0)