Skip to content

Commit d1a8739

Browse files
committed
Request data in batches.
1 parent f54252f commit d1a8739

File tree

2 files changed

+21
-10
lines changed

2 files changed

+21
-10
lines changed

src/main/java/rx/internal/operators/BlockingOperatorToIterator.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import rx.Observable;
2525
import rx.Subscriber;
2626
import rx.exceptions.Exceptions;
27+
import rx.internal.util.RxRingBuffer;
2728

2829
/**
2930
* Returns an Iterator that iterates over all items emitted by a specified Observable.
@@ -56,17 +57,19 @@ public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
5657
public static final class SubscriberIterator<T>
5758
extends Subscriber<Notification<? extends T>> implements Iterator<T> {
5859

60+
static final int LIMIT = 3 * RxRingBuffer.SIZE / 4;
61+
5962
private final BlockingQueue<Notification<? extends T>> notifications;
6063
private Notification<? extends T> buf;
64+
private int received;
6165

6266
public SubscriberIterator() {
6367
this.notifications = new LinkedBlockingQueue<Notification<? extends T>>();
64-
this.buf = null;
6568
}
6669

6770
@Override
6871
public void onStart() {
69-
request(0);
72+
request(RxRingBuffer.SIZE);
7073
}
7174

7275
@Override
@@ -87,8 +90,12 @@ public void onNext(Notification<? extends T> args) {
8790
@Override
8891
public boolean hasNext() {
8992
if (buf == null) {
90-
request(1);
9193
buf = take();
94+
received++;
95+
if (received >= LIMIT) {
96+
request(received);
97+
received = 0;
98+
}
9299
}
93100
if (buf.isOnError()) {
94101
throw Exceptions.propagate(buf.getThrowable());

src/test/java/rx/internal/operators/BlockingOperatorToIteratorTest.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import rx.Observable.OnSubscribe;
2727
import rx.Subscriber;
2828
import rx.exceptions.TestException;
29+
import rx.internal.operators.BlockingOperatorToIterator.SubscriberIterator;
30+
import rx.internal.util.RxRingBuffer;
2931

3032
public class BlockingOperatorToIteratorTest {
3133

@@ -96,26 +98,28 @@ public Iterator<Integer> iterator() {
9698
Iterator<Integer> it = toIterator(obs);
9799
while (it.hasNext()) {
98100
// Correct backpressure should cause this interleaved behavior.
101+
// We first request RxRingBuffer.SIZE. Then in increments of
102+
// SubscriberIterator.LIMIT.
99103
int i = it.next();
100-
assertEquals(i + 1, src.count);
104+
int expected = i - (i % SubscriberIterator.LIMIT) + RxRingBuffer.SIZE;
105+
expected = Math.min(expected, Counter.MAX);
106+
107+
assertEquals(expected, src.count);
101108
}
102109
}
103110

104111
public static final class Counter implements Iterator<Integer> {
112+
static final int MAX = 5 * RxRingBuffer.SIZE;
105113
public int count;
106114

107-
public Counter() {
108-
this.count = 0;
109-
}
110-
111115
@Override
112116
public boolean hasNext() {
113-
return count < 5;
117+
return count < MAX;
114118
}
115119

116120
@Override
117121
public Integer next() {
118-
return count++;
122+
return ++count;
119123
}
120124

121125
@Override

0 commit comments

Comments
 (0)