Skip to content

Commit 6f4c27e

Browse files
Merge pull request #1830 from benjchristensen/313-mergeDelayError
Fix mergeDelayError Handling of Error in Parent Observable
2 parents 18d6f3e + 6349171 commit 6f4c27e

File tree

2 files changed

+73
-3
lines changed

2 files changed

+73
-3
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,11 @@ public Boolean call(InnerSubscriber<T> s) {
397397

398398
@Override
399399
public void onError(Throwable e) {
400+
completed = true;
401+
innerError(e);
402+
}
403+
404+
private void innerError(Throwable e) {
400405
if (delayErrors) {
401406
synchronized (this) {
402407
if (exceptions == null) {
@@ -540,7 +545,7 @@ public void onNext(T t) {
540545
public void onError(Throwable e) {
541546
// it doesn't go through queues, it immediately onErrors and tears everything down
542547
if (ONCE_TERMINATED.compareAndSet(this, 0, 1)) {
543-
parentSubscriber.onError(e);
548+
parentSubscriber.innerError(e);
544549
}
545550
}
546551

@@ -753,4 +758,4 @@ private int drainQueue() {
753758
}
754759
}
755760
}
756-
}
761+
}

src/test/java/rx/internal/operators/OperatorMergeDelayErrorTest.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,20 @@
2020
import org.mockito.InOrder;
2121
import org.mockito.Mock;
2222
import org.mockito.MockitoAnnotations;
23+
2324
import rx.Observable;
2425
import rx.Observable.OnSubscribe;
2526
import rx.Observer;
2627
import rx.Subscriber;
2728
import rx.exceptions.CompositeException;
2829
import rx.exceptions.TestException;
30+
import rx.observers.TestSubscriber;
2931

3032
import java.util.ArrayList;
33+
import java.util.Arrays;
3134
import java.util.List;
3235
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.TimeUnit;
3337

3438
import static org.junit.Assert.*;
3539
import static org.mockito.Matchers.any;
@@ -475,4 +479,65 @@ public void onCompleted() {
475479
inOrder.verify(o).onError(any(TestException.class));
476480
verify(o, never()).onCompleted();
477481
}
478-
}
482+
483+
@Test
484+
public void testErrorInParentObservable() {
485+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
486+
Observable.mergeDelayError(
487+
Observable.just(Observable.just(1), Observable.just(2))
488+
.startWith(Observable.<Integer> error(new RuntimeException()))
489+
).subscribe(ts);
490+
ts.awaitTerminalEvent();
491+
ts.assertTerminalEvent();
492+
ts.assertReceivedOnNext(Arrays.asList(1, 2));
493+
assertEquals(1, ts.getOnErrorEvents().size());
494+
495+
}
496+
497+
@Test
498+
public void testErrorInParentObservableDelayed() throws Exception {
499+
final TestASynchronous1sDelayedObservable o1 = new TestASynchronous1sDelayedObservable();
500+
final TestASynchronous1sDelayedObservable o2 = new TestASynchronous1sDelayedObservable();
501+
Observable<Observable<String>> parentObservable = Observable.create(new Observable.OnSubscribe<Observable<String>>() {
502+
@Override
503+
public void call(Subscriber<? super Observable<String>> op) {
504+
op.onNext(Observable.create(o1));
505+
op.onNext(Observable.create(o2));
506+
op.onError(new NullPointerException("throwing exception in parent"));
507+
}
508+
});
509+
510+
TestSubscriber<String> ts = new TestSubscriber<String>(stringObserver);
511+
Observable<String> m = Observable.mergeDelayError(parentObservable);
512+
m.subscribe(ts);
513+
ts.awaitTerminalEvent(2000, TimeUnit.MILLISECONDS);
514+
ts.assertTerminalEvent();
515+
516+
verify(stringObserver, times(2)).onNext("hello");
517+
verify(stringObserver, times(1)).onError(any(NullPointerException.class));
518+
verify(stringObserver, never()).onCompleted();
519+
}
520+
521+
private static class TestASynchronous1sDelayedObservable implements Observable.OnSubscribe<String> {
522+
Thread t;
523+
524+
@Override
525+
public void call(final Subscriber<? super String> observer) {
526+
t = new Thread(new Runnable() {
527+
528+
@Override
529+
public void run() {
530+
try {
531+
Thread.sleep(100);
532+
} catch (InterruptedException e) {
533+
observer.onError(e);
534+
}
535+
observer.onNext("hello");
536+
observer.onCompleted();
537+
}
538+
539+
});
540+
t.start();
541+
}
542+
}
543+
}

0 commit comments

Comments
 (0)