Skip to content

Commit d827e06

Browse files
committed
add TODOs and comments
1 parent 2736df6 commit d827e06

File tree

2 files changed

+25
-0
lines changed

2 files changed

+25
-0
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
import static rx.Observable.create;
3535

36+
import java.util.Random;
3637
import java.util.concurrent.atomic.AtomicBoolean;
3738
import java.util.concurrent.atomic.AtomicLong;
3839
import java.util.concurrent.atomic.AtomicReference;
@@ -339,9 +340,19 @@ public void onNext(Object t) {
339340
// if there are outstanding requests
340341
if (consumerCapacity.get() > 0) {
341342
// schedule resubscription
343+
// note that at this point consumerCapacity may have dropped to zero
344+
// but this is a best endeavours check to try not to resubscribe more than
345+
// we need to
342346
worker.schedule(subscribeToSource);
343347
} else {
344348
// otherwise we indicate that on the next request we should resubscribe
349+
// because it is a potential waste of resources to schedule resubscription
350+
// now when there is the possibility that the child may unsubscribe and never use
351+
// that resubscription
352+
353+
//TODO what if consumerCapacity increases now then we are in a situation where the
354+
// next call of request(n) will do nothing because of the c ==0 check in request(n)
355+
// this would cause the stream to stall?
345356
resumeBoundary.compareAndSet(false, true);
346357
}
347358
}
@@ -359,9 +370,16 @@ public void setProducer(Producer producer) {
359370

360371
@Override
361372
public void request(final long n) {
373+
// TODO at this point we usually (?) add some optimizations
374+
// like detecting that we are already on the fast path (non-backpressure)
375+
// and not attempting to process additional requests
362376
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
363377
Producer producer = currentProducer.get();
364378
if (producer != null) {
379+
// TODO what if at this point the subscription finishes and currentProducer
380+
// is set to null or even the next producer. The request would be added to consumerCapacity but
381+
// if it more requests only come after emission then this call to the old producer could produce
382+
// nothing and the stream would stall
365383
producer.request(n);
366384
} else
367385
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -708,6 +708,7 @@ public void testRetryWithBackpressure() throws InterruptedException {
708708

709709
@Test(timeout = 15000)
710710
public void testRetryWithBackpressureParallel() throws InterruptedException {
711+
final long startTime = System.currentTimeMillis();
711712
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
712713
int ncpu = Runtime.getRuntime().availableProcessors();
713714
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 2));
@@ -718,11 +719,17 @@ public void testRetryWithBackpressureParallel() throws InterruptedException {
718719

719720
int m = 5000;
720721
final CountDownLatch cdl = new CountDownLatch(m);
722+
721723
for (int i = 0; i < m; i++) {
722724
final int j = i;
723725
exec.execute(new Runnable() {
724726
@Override
725727
public void run() {
728+
if (System.currentTimeMillis() - startTime >12000) {
729+
//don't start this job if we are close to test timeout
730+
cdl.countDown();
731+
return;
732+
}
726733
final AtomicInteger nexts = new AtomicInteger();
727734
try {
728735
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));

0 commit comments

Comments
 (0)