Skip to content

Commit 71d3d0f

Browse files
committed
Merge pull request #3640 from davidmoten/fix-onBackpressureDrop-error-handling
fix error handling in onBackpressureDrop
2 parents 2dbf686 + 9675189 commit 71d3d0f

File tree

2 files changed

+38
-1
lines changed

2 files changed

+38
-1
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.Observable.Operator;
2121
import rx.Producer;
2222
import rx.Subscriber;
23+
import rx.exceptions.Exceptions;
2324
import rx.functions.Action1;
2425

2526
public class OperatorOnBackpressureDrop<T> implements Operator<T, T> {
@@ -84,7 +85,12 @@ public void onNext(T t) {
8485
} else {
8586
// item dropped
8687
if(onDrop != null) {
87-
onDrop.call(t);
88+
try {
89+
onDrop.call(t);
90+
} catch (Throwable e) {
91+
Exceptions.throwOrReport(e, child, t);
92+
return;
93+
}
8894
}
8995
}
9096
}

src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
1920

2021
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2123
import java.util.concurrent.atomic.AtomicInteger;
2224

2325
import org.junit.Test;
@@ -26,6 +28,8 @@
2628
import rx.Observable.OnSubscribe;
2729
import rx.Observer;
2830
import rx.Subscriber;
31+
import rx.functions.Action0;
32+
import rx.functions.Action1;
2933
import rx.internal.util.RxRingBuffer;
3034
import rx.observers.TestSubscriber;
3135
import rx.schedulers.Schedulers;
@@ -117,6 +121,33 @@ public void onNext(Long t) {
117121
}});
118122
assertEquals(n, count.get());
119123
}
124+
125+
@Test
126+
public void testNonFatalExceptionFromOverflowActionIsNotReportedFromUpstreamOperator() {
127+
final AtomicBoolean errorOccurred = new AtomicBoolean(false);
128+
//request 0
129+
TestSubscriber<Long> ts = TestSubscriber.create(0);
130+
//range method emits regardless of requests so should trigger onBackpressureDrop action
131+
range(2)
132+
// if haven't caught exception in onBackpressureDrop operator then would incorrectly
133+
// be picked up by this call to doOnError
134+
.doOnError(new Action1<Throwable>() {
135+
@Override
136+
public void call(Throwable t) {
137+
errorOccurred.set(true);
138+
}
139+
})
140+
.onBackpressureDrop(THROW_NON_FATAL)
141+
.subscribe(ts);
142+
assertFalse(errorOccurred.get());
143+
}
144+
145+
private static final Action1<Long> THROW_NON_FATAL = new Action1<Long>() {
146+
@Override
147+
public void call(Long n) {
148+
throw new RuntimeException();
149+
}
150+
};
120151

121152
static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {
122153

0 commit comments

Comments
 (0)