File tree Expand file tree Collapse file tree 1 file changed +3
-10
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +3
-10
lines changed Original file line number Diff line number Diff line change @@ -236,15 +236,8 @@ public void onError(Throwable e) {
236
236
@ Override
237
237
public void onNext (T v ) {
238
238
if (!done ) {
239
- if (consumerCapacity .get () == 0 ) {
240
- child .onError (new MissingBackpressureException (
241
- "an item has arrived to this operator that was not requested, "
242
- + "please use backpressure aware operators upstream (or insert .onBackpressureBuffer() or similar)" ));
243
- return ;
244
- } else {
245
- child .onNext (v );
246
- decrementConsumerCapacity ();
247
- }
239
+ child .onNext (v );
240
+ decrementConsumerCapacity ();
248
241
}
249
242
}
250
243
@@ -382,7 +375,7 @@ public void request(final long n) {
382
375
// producer will call setProducer and request with consumerCapacity on the new producer.
383
376
producer .request (n );
384
377
} else
385
- if (c == 0 && resumeBoundary .compareAndSet (true , false )) {
378
+ if (resumeBoundary .compareAndSet (true , false )) {
386
379
worker .schedule (subscribeToSource );
387
380
}
388
381
}
You can’t perform that action at this time.
0 commit comments