Skip to content

Commit 35b51b3

Browse files
committed
1.x: cast() should unsubscribe on crash eagerly (#3895)
1 parent 4165091 commit 35b51b3

File tree

2 files changed

+79
-25
lines changed

2 files changed

+79
-25
lines changed

src/main/java/rx/internal/operators/OperatorCast.java

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,10 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import rx.*;
1819
import rx.Observable.Operator;
1920
import rx.exceptions.*;
20-
import rx.Subscriber;
21+
import rx.internal.util.RxJavaPluginUtils;
2122

2223
/**
2324
* Converts the elements of an observable sequence to the specified type.
@@ -32,26 +33,63 @@ public OperatorCast(Class<R> castClass) {
3233

3334
@Override
3435
public Subscriber<? super T> call(final Subscriber<? super R> o) {
35-
return new Subscriber<T>(o) {
36+
CastSubscriber<T, R> parent = new CastSubscriber<T, R>(o, castClass);
37+
o.add(parent);
38+
return parent;
39+
}
40+
41+
static final class CastSubscriber<T, R> extends Subscriber<T> {
42+
43+
final Subscriber<? super R> actual;
44+
45+
final Class<R> castClass;
3646

37-
@Override
38-
public void onCompleted() {
39-
o.onCompleted();
47+
boolean done;
48+
49+
public CastSubscriber(Subscriber<? super R> actual, Class<R> castClass) {
50+
this.actual = actual;
51+
this.castClass = castClass;
52+
}
53+
54+
@Override
55+
public void onNext(T t) {
56+
R result;
57+
58+
try {
59+
result = castClass.cast(t);
60+
} catch (Throwable ex) {
61+
Exceptions.throwIfFatal(ex);
62+
unsubscribe();
63+
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
64+
return;
4065
}
41-
42-
@Override
43-
public void onError(Throwable e) {
44-
o.onError(e);
66+
67+
actual.onNext(result);
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
if (done) {
73+
RxJavaPluginUtils.handleException(e);
74+
return;
4575
}
46-
47-
@Override
48-
public void onNext(T t) {
49-
try {
50-
o.onNext(castClass.cast(t));
51-
} catch (Throwable e) {
52-
Exceptions.throwOrReport(e, this, t);
53-
}
76+
done = true;
77+
78+
actual.onError(e);
79+
}
80+
81+
82+
@Override
83+
public void onCompleted() {
84+
if (done) {
85+
return;
5486
}
55-
};
87+
actual.onCompleted();
88+
}
89+
90+
@Override
91+
public void setProducer(Producer p) {
92+
actual.setProducer(p);
93+
}
5694
}
5795
}

src/test/java/rx/internal/operators/OperatorCastTest.java

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,13 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.mockito.Mockito.mock;
19-
import static org.mockito.Mockito.never;
20-
import static org.mockito.Mockito.times;
21-
import static org.mockito.Mockito.verify;
18+
import static org.mockito.Mockito.*;
2219

23-
import org.junit.Test;
20+
import org.junit.*;
2421

25-
import rx.Observable;
26-
import rx.Observer;
22+
import rx.*;
23+
import rx.observers.TestSubscriber;
24+
import rx.subjects.PublishSubject;
2725

2826
public class OperatorCastTest {
2927

@@ -53,4 +51,22 @@ public void testCastWithWrongType() {
5351
verify(observer, times(1)).onError(
5452
org.mockito.Matchers.any(ClassCastException.class));
5553
}
54+
55+
@Test
56+
public void castCrashUnsubscribes() {
57+
58+
PublishSubject<Integer> ps = PublishSubject.create();
59+
60+
TestSubscriber<String> ts = TestSubscriber.create();
61+
62+
ps.cast(String.class).unsafeSubscribe(ts);
63+
64+
Assert.assertTrue("Not subscribed?", ps.hasObservers());
65+
66+
ps.onNext(1);
67+
68+
Assert.assertFalse("Subscribed?", ps.hasObservers());
69+
70+
ts.assertError(ClassCastException.class);
71+
}
5672
}

0 commit comments

Comments
 (0)