|
16 | 16 | package rx.internal.operators;
|
17 | 17 |
|
18 | 18 | import static org.junit.Assert.assertEquals;
|
| 19 | +import static org.junit.Assert.assertFalse; |
19 | 20 |
|
20 | 21 | import java.util.concurrent.CountDownLatch;
|
| 22 | +import java.util.concurrent.atomic.AtomicBoolean; |
21 | 23 | import java.util.concurrent.atomic.AtomicInteger;
|
22 | 24 |
|
23 | 25 | import org.junit.Test;
|
|
26 | 28 | import rx.Observable.OnSubscribe;
|
27 | 29 | import rx.Observer;
|
28 | 30 | import rx.Subscriber;
|
| 31 | +import rx.functions.Action0; |
| 32 | +import rx.functions.Action1; |
29 | 33 | import rx.internal.util.RxRingBuffer;
|
30 | 34 | import rx.observers.TestSubscriber;
|
31 | 35 | import rx.schedulers.Schedulers;
|
@@ -117,6 +121,33 @@ public void onNext(Long t) {
|
117 | 121 | }});
|
118 | 122 | assertEquals(n, count.get());
|
119 | 123 | }
|
| 124 | + |
| 125 | + @Test |
| 126 | + public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() { |
| 127 | + final AtomicBoolean errorOccurred = new AtomicBoolean(false); |
| 128 | + //request 0 |
| 129 | + TestSubscriber<Long> ts = TestSubscriber.create(0); |
| 130 | + //range method emits regardless of requests so should trigger onBackpressureDrop action |
| 131 | + range(2) |
| 132 | + // if haven't caught exception in onBackpressureDrop operator then would incorrectly |
| 133 | + // be picked up by this call to doOnError |
| 134 | + .doOnError(new Action1<Throwable>() { |
| 135 | + @Override |
| 136 | + public void call(Throwable t) { |
| 137 | + errorOccurred.set(true); |
| 138 | + } |
| 139 | + }) |
| 140 | + .onBackpressureDrop(THROW_NON_FATAL) |
| 141 | + .subscribe(ts); |
| 142 | + assertFalse(errorOccurred.get()); |
| 143 | + } |
| 144 | + |
| 145 | + private static final Action1<Long> THROW_NON_FATAL = new Action1<Long>() { |
| 146 | + @Override |
| 147 | + public void call(Long n) { |
| 148 | + throw new RuntimeException(); |
| 149 | + } |
| 150 | + }; |
120 | 151 |
|
121 | 152 | static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {
|
122 | 153 |
|
|
0 commit comments