Skip to content

Commit 2736df6

Browse files
committed
emit MissingBackpressureException
1 parent 3fbbfa9 commit 2736df6

File tree

1 file changed

+21
-13
lines changed

1 file changed

+21
-13
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -236,21 +236,29 @@ public void onError(Throwable e) {
236236
@Override
237237
public void onNext(T v) {
238238
if (!done) {
239-
child.onNext(v);
240-
while (true) {
241-
long cc = consumerCapacity.get();
242-
if (cc == 0) {
243-
child.onError(new MissingBackpressureException(
244-
"an item has arrived to this operator that was not requested, "
245-
+ "please use backpressure aware operators upstream (or append .onBackpressureBuffer() or similar)"));
246-
break;
247-
} else if (cc != Long.MAX_VALUE) {
248-
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
249-
break;
250-
}
251-
} else {
239+
if (consumerCapacity.get() == 0) {
240+
child.onError(new MissingBackpressureException(
241+
"an item has arrived to this operator that was not requested, "
242+
+ "please use backpressure aware operators upstream (or insert .onBackpressureBuffer() or similar)"));
243+
return;
244+
} else {
245+
child.onNext(v);
246+
decrementConsumerCapacity();
247+
}
248+
}
249+
}
250+
251+
private void decrementConsumerCapacity() {
252+
// use a CAS loop because we don't want to decrement the value
253+
// if it is Long.MAX_VALUE
254+
while (true) {
255+
long cc = consumerCapacity.get();
256+
if (cc != Long.MAX_VALUE) {
257+
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
252258
break;
253259
}
260+
} else {
261+
break;
254262
}
255263
}
256264
}

0 commit comments

Comments
 (0)