Skip to content

Commit 502405d

Browse files
Merge pull request #1771 from neerajrj/onErrorReturnBackpressure
On error return backpressure
2 parents 8ed945f + 0b01c02 commit 502405d

File tree

4 files changed

+93
-0
lines changed

4 files changed

+93
-0
lines changed

src/main/java/rx/internal/operators/OperatorOnErrorReturn.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Arrays;
1919

2020
import rx.Observable.Operator;
21+
import rx.Producer;
2122
import rx.Subscriber;
2223
import rx.exceptions.CompositeException;
2324
import rx.exceptions.Exceptions;
@@ -92,6 +93,16 @@ public void onCompleted() {
9293
child.onCompleted();
9394
}
9495

96+
@Override
97+
public void setProducer(final Producer producer) {
98+
child.setProducer(new Producer() {
99+
@Override
100+
public void request(long n) {
101+
producer.request(n);
102+
}
103+
});
104+
}
105+
95106
};
96107
child.add(parent);
97108
return parent;

src/main/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import rx.Observable;
19+
import rx.Producer;
1920
import rx.Observable.Operator;
2021
import rx.Subscriber;
2122
import rx.exceptions.Exceptions;
@@ -92,6 +93,16 @@ public void onCompleted() {
9293
child.onCompleted();
9394
}
9495

96+
@Override
97+
public void setProducer(final Producer producer) {
98+
child.setProducer(new Producer() {
99+
@Override
100+
public void request(long n) {
101+
producer.request(n);
102+
}
103+
});
104+
}
105+
95106
};
96107
child.add(s);
97108

src/test/java/rx/internal/operators/OperatorOnErrorReturnTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import rx.Subscriber;
3333
import rx.functions.Func1;
3434
import rx.observers.TestSubscriber;
35+
import rx.schedulers.Schedulers;
3536

3637
public class OperatorOnErrorReturnTest {
3738

@@ -145,6 +146,41 @@ public String call(Throwable t1) {
145146
verify(observer, Mockito.never()).onNext("three");
146147
verify(observer, times(1)).onNext("resume");
147148
}
149+
150+
@Test
151+
public void testBackpressure() {
152+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
153+
Observable.range(0, 100000)
154+
.onErrorReturn(new Func1<Throwable, Integer>() {
155+
156+
@Override
157+
public Integer call(Throwable t1) {
158+
return 1;
159+
}
160+
161+
})
162+
.observeOn(Schedulers.computation())
163+
.map(new Func1<Integer, Integer>() {
164+
int c = 0;
165+
166+
@Override
167+
public Integer call(Integer t1) {
168+
if (c++ <= 1) {
169+
// slow
170+
try {
171+
Thread.sleep(500);
172+
} catch (InterruptedException e) {
173+
e.printStackTrace();
174+
}
175+
}
176+
return t1;
177+
}
178+
179+
})
180+
.subscribe(ts);
181+
ts.awaitTerminalEvent();
182+
ts.assertNoErrors();
183+
}
148184

149185
private static class TestObservable implements Observable.OnSubscribe<String> {
150186

@@ -180,4 +216,7 @@ public void run() {
180216
System.out.println("done starting TestObservable thread");
181217
}
182218
}
219+
220+
221+
183222
}

src/test/java/rx/internal/operators/OperatorOnExceptionResumeNextViaObservableTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import rx.Observer;
3131
import rx.Subscriber;
3232
import rx.functions.Func1;
33+
import rx.observers.TestSubscriber;
34+
import rx.schedulers.Schedulers;
3335

3436
public class OperatorOnExceptionResumeNextViaObservableTest {
3537

@@ -188,6 +190,36 @@ public String call(String s) {
188190
verify(observer, Mockito.never()).onError(any(Throwable.class));
189191
verify(observer, times(1)).onCompleted();
190192
}
193+
194+
195+
@Test
196+
public void testBackpressure() {
197+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
198+
Observable.range(0, 100000)
199+
.onExceptionResumeNext(Observable.just(1))
200+
.observeOn(Schedulers.computation())
201+
.map(new Func1<Integer, Integer>() {
202+
int c = 0;
203+
204+
@Override
205+
public Integer call(Integer t1) {
206+
if (c++ <= 1) {
207+
// slow
208+
try {
209+
Thread.sleep(500);
210+
} catch (InterruptedException e) {
211+
e.printStackTrace();
212+
}
213+
}
214+
return t1;
215+
}
216+
217+
})
218+
.subscribe(ts);
219+
ts.awaitTerminalEvent();
220+
ts.assertNoErrors();
221+
}
222+
191223

192224
private static class TestObservable implements Observable.OnSubscribe<String> {
193225

0 commit comments

Comments
 (0)