File tree Expand file tree Collapse file tree 1 file changed +4
-5
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +4
-5
lines changed Original file line number Diff line number Diff line change 33
33
34
34
import static rx .Observable .create ;
35
35
36
- import java .util .Random ;
37
36
import java .util .concurrent .atomic .AtomicBoolean ;
38
37
import java .util .concurrent .atomic .AtomicLong ;
39
38
import java .util .concurrent .atomic .AtomicReference ;
@@ -376,10 +375,10 @@ public void request(final long n) {
376
375
long c = BackpressureUtils .getAndAddRequest (consumerCapacity , n );
377
376
Producer producer = currentProducer .get ();
378
377
if (producer != null ) {
379
- // TODO what if at this point the subscription finishes and currentProducer
380
- // is set to null or even the next producer. The request would be added to consumerCapacity but
381
- // if more requests only come after emission then this call to the old producer could produce
382
- // nothing and the stream would stall
378
+ // if at this point the subscription finishes and currentProducer
379
+ // is set to null or even the next producer then we are still ok because even
380
+ // though the producer may be inactive below, the resubscription that changed the
381
+ // producer will call setProducer and request with consumerCapacity on the new producer.
383
382
producer .request (n );
384
383
} else
385
384
if (c == 0 && resumeBoundary .compareAndSet (true , false )) {
You can’t perform that action at this time.
0 commit comments