Skip to content

Commit e886c50

Browse files
Merge pull request #1765 from neerajrj/onErrorResumeBackpressure
backpressure support in onErrorResumeNext* operators
2 parents 78b7b59 + 4186590 commit e886c50

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaFunction.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import rx.Observable;
19+
import rx.Producer;
1920
import rx.Observable.Operator;
2021
import rx.Subscriber;
2122
import rx.exceptions.Exceptions;
@@ -87,6 +88,16 @@ public void onNext(T t) {
8788
}
8889
child.onNext(t);
8990
}
91+
92+
@Override
93+
public void setProducer(final Producer producer) {
94+
child.setProducer(new Producer() {
95+
@Override
96+
public void request(long n) {
97+
producer.request(n);
98+
}
99+
});
100+
}
90101

91102
};
92103
child.add(parent);

src/main/java/rx/internal/operators/OperatorOnErrorResumeNextViaObservable.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import rx.Observable;
19+
import rx.Producer;
1920
import rx.Observable.Operator;
2021
import rx.Subscriber;
2122
import rx.exceptions.Exceptions;
@@ -84,6 +85,16 @@ public void onCompleted() {
8485
child.onCompleted();
8586
}
8687

88+
@Override
89+
public void setProducer(final Producer producer) {
90+
child.setProducer(new Producer() {
91+
@Override
92+
public void request(long n) {
93+
producer.request(n);
94+
}
95+
});
96+
}
97+
8798
};
8899
child.add(s);
89100

0 commit comments

Comments
 (0)