Skip to content

Commit ab96a98

Browse files
committed
prevent OperatorTake from requesting more than needed
1 parent ae09b86 commit ab96a98

File tree

2 files changed

+42
-7
lines changed

2 files changed

+42
-7
lines changed

src/main/java/rx/internal/operators/OperatorTake.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicLong;
19+
1820
import rx.Observable.Operator;
1921
import rx.Producer;
2022
import rx.Subscriber;
@@ -78,15 +80,22 @@ public void onNext(T i) {
7880
@Override
7981
public void setProducer(final Producer producer) {
8082
child.setProducer(new Producer() {
81-
83+
84+
// keeps track of requests up to maximum of `limit`
85+
final AtomicLong requested = new AtomicLong(0);
86+
8287
@Override
8388
public void request(long n) {
84-
if (!completed) {
85-
long c = limit - count;
86-
if (n < c) {
87-
producer.request(n);
88-
} else {
89-
producer.request(c);
89+
if (n >0 && !completed) {
90+
// because requests may happen concurrently use a CAS loop to
91+
// ensure we only request as much as needed, no more no less
92+
while (true) {
93+
long r = requested.get();
94+
long c = Math.min(n, limit - requested.get());
95+
if (requested.compareAndSet(r, r + c)) {
96+
producer.request(c);
97+
break;
98+
}
9099
}
91100
}
92101
}

src/test/java/rx/internal/operators/OperatorTakeTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import java.util.Arrays;
2929
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.concurrent.atomic.AtomicLong;
@@ -388,4 +389,29 @@ public void call(Integer t1) {
388389
latch.await();
389390
assertNull(exception.get());
390391
}
392+
393+
@Test
394+
public void testDoesntRequestMoreThanNeededFromUpstream() throws InterruptedException {
395+
final AtomicLong requests = new AtomicLong();
396+
TestSubscriber<Long> ts = new TestSubscriber<Long>(0);
397+
Observable.interval(100, TimeUnit.MILLISECONDS)
398+
//
399+
.doOnRequest(new Action1<Long>() {
400+
@Override
401+
public void call(Long n) {
402+
requests.addAndGet(n);
403+
}})
404+
//
405+
.take(2)
406+
//
407+
.subscribe(ts);
408+
Thread.sleep(50);
409+
ts.requestMore(1);
410+
ts.requestMore(1);
411+
ts.requestMore(1);
412+
ts.awaitTerminalEvent();
413+
ts.assertCompleted();
414+
ts.assertNoErrors();
415+
assertEquals(2,requests.get());
416+
}
391417
}

0 commit comments

Comments
 (0)