Skip to content

Commit 72ebf31

Browse files
committed
Switch operator should now propagate onError in subsequences.
1 parent 0e1cf35 commit 72ebf31

File tree

1 file changed

+34
-19
lines changed

1 file changed

+34
-19
lines changed

rxjava-core/src/main/java/rx/operators/OperationSwitch.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,6 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
19-
import static org.mockito.Mockito.*;
20-
2118
import java.util.concurrent.TimeUnit;
2219
import java.util.concurrent.atomic.AtomicReference;
2320

@@ -30,9 +27,19 @@
3027
import rx.Subscription;
3128
import rx.concurrency.TestScheduler;
3229
import rx.subscriptions.Subscriptions;
30+
import rx.util.AtomicObservableSubscription;
3331
import rx.util.functions.Action0;
3432
import rx.util.functions.Func1;
3533

34+
import static org.mockito.Mockito.inOrder;
35+
import static org.mockito.Mockito.mock;
36+
import static org.mockito.Mockito.never;
37+
import static org.mockito.Mockito.times;
38+
import static org.mockito.Mockito.verify;
39+
40+
import static org.mockito.Matchers.any;
41+
import static org.mockito.Matchers.anyString;
42+
3643

3744
/**
3845
* This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single
@@ -67,54 +74,64 @@ public Switch(Observable<Observable<T>> sequences) {
6774

6875
@Override
6976
public Subscription call(Observer<T> observer) {
70-
return sequences.subscribe(new SwitchObserver<T>(observer));
77+
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
78+
subscription.wrap(sequences.subscribe(new SwitchObserver<T>(observer, subscription)));
79+
return subscription;
7180
}
7281
}
7382

7483
private static class SwitchObserver<T> implements Observer<Observable<T>> {
7584

76-
private final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();
77-
7885
private final Observer<T> observer;
86+
private final AtomicObservableSubscription parent;
87+
private final AtomicReference<Subscription> subsequence = new AtomicReference<Subscription>();
7988

80-
public SwitchObserver(Observer<T> observer) {
89+
public SwitchObserver(Observer<T> observer, AtomicObservableSubscription parent) {
8190
this.observer = observer;
91+
this.parent = parent;
8292
}
8393

8494
@Override
8595
public void onCompleted() {
96+
unsubscribeFromSubSequence();
8697
observer.onCompleted();
8798
}
8899

89100
@Override
90101
public void onError(Exception e) {
102+
unsubscribeFromSubSequence();
91103
observer.onError(e);
92104
}
93105

94106
@Override
95107
public void onNext(Observable<T> args) {
96-
Subscription previousSubscription = subscription.get();
97-
if (previousSubscription != null) {
98-
previousSubscription.unsubscribe();
99-
}
108+
unsubscribeFromSubSequence();
100109

101-
subscription.set(args.subscribe(new Observer<T>() {
110+
subsequence.set(args.subscribe(new Observer<T>() {
102111
@Override
103112
public void onCompleted() {
104-
// Do nothing.
113+
// Do nothing.
105114
}
106115

107116
@Override
108117
public void onError(Exception e) {
109-
// Do nothing.
118+
parent.unsubscribe();
119+
observer.onError(e);
110120
}
111121

112-
@Override
122+
@Override
113123
public void onNext(T args) {
114124
observer.onNext(args);
115125
}
116126
}));
117127
}
128+
129+
private void unsubscribeFromSubSequence() {
130+
Subscription previousSubscription = subsequence.get();
131+
if (previousSubscription != null) {
132+
previousSubscription.unsubscribe();
133+
}
134+
}
118135
}
119136

120137
public static class UnitTest {
@@ -299,7 +316,6 @@ public Subscription call(Observer<String> observer) {
299316
verify(observer, never()).onError(any(Exception.class));
300317

301318
scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
302-
inOrder.verify(observer, times(1)).onNext("two");
303319
inOrder.verify(observer, times(1)).onNext("three");
304320
verify(observer, never()).onCompleted();
305321
verify(observer, never()).onError(any(Exception.class));
@@ -355,10 +371,9 @@ public Subscription call(Observer<String> observer) {
355371
verify(observer, never()).onError(any(Exception.class));
356372

357373
scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
358-
inOrder.verify(observer, times(1)).onNext("two");
359-
inOrder.verify(observer, times(1)).onNext("three");
374+
inOrder.verify(observer, never()).onNext("three");
360375
verify(observer, never()).onCompleted();
361-
verify(observer, never()).onError(any(Exception.class));
376+
verify(observer, times(1)).onError(any(TestException.class));
362377
}
363378

364379
private <T> void publishCompleted(final Observer<T> observer, long delay) {

0 commit comments

Comments
 (0)