Skip to content

Commit 3fbbfa9

Browse files
committed
don't decrement if consumerCapacity is 0
1 parent 669934e commit 3fbbfa9

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import rx.Producer;
4545
import rx.Scheduler;
4646
import rx.Subscriber;
47+
import rx.exceptions.MissingBackpressureException;
4748
import rx.functions.Action0;
4849
import rx.functions.Func1;
4950
import rx.functions.Func2;
@@ -238,11 +239,16 @@ public void onNext(T v) {
238239
child.onNext(v);
239240
while (true) {
240241
long cc = consumerCapacity.get();
241-
if (cc != Long.MAX_VALUE) {
242-
if (consumerCapacity.compareAndSet(cc, cc -1)) {
242+
if (cc == 0) {
243+
child.onError(new MissingBackpressureException(
244+
"an item has arrived to this operator that was not requested, "
245+
+ "please use backpressure aware operators upstream (or append .onBackpressureBuffer() or similar)"));
246+
break;
247+
} else if (cc != Long.MAX_VALUE) {
248+
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
243249
break;
244250
}
245-
} else {
251+
} else {
246252
break;
247253
}
248254
}
@@ -320,10 +326,14 @@ public void onError(Throwable e) {
320326

321327
@Override
322328
public void onNext(Object t) {
329+
// a restart instruction has arrived
323330
if (!isLocked.get() && !child.isUnsubscribed()) {
331+
// if there are outstanding requests
324332
if (consumerCapacity.get() > 0) {
333+
// schedule resubscription
325334
worker.schedule(subscribeToSource);
326335
} else {
336+
// otherwise we indicate that on the next request we should resubscribe
327337
resumeBoundary.compareAndSet(false, true);
328338
}
329339
}

0 commit comments

Comments
 (0)