Skip to content

Commit f54252f

Browse files
committed
Make BlockingOperatorToIterator exert backpressure.
1 parent 84622bb commit f54252f

File tree

2 files changed

+106
-51
lines changed

2 files changed

+106
-51
lines changed

src/main/java/rx/internal/operators/BlockingOperatorToIterator.java

Lines changed: 64 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import rx.Notification;
2424
import rx.Observable;
2525
import rx.Subscriber;
26-
import rx.Subscription;
2726
import rx.exceptions.Exceptions;
2827

2928
/**
@@ -47,68 +46,82 @@ private BlockingOperatorToIterator() {
4746
* @return the iterator that could be used to iterate over the elements of the observable.
4847
*/
4948
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
50-
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();
49+
SubscriberIterator<T> subscriber = new SubscriberIterator<T>();
5150

5251
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
53-
final Subscription subscription = source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
54-
@Override
55-
public void onCompleted() {
56-
// ignore
57-
}
52+
source.materialize().subscribe(subscriber);
53+
return subscriber;
54+
}
5855

59-
@Override
60-
public void onError(Throwable e) {
61-
notifications.offer(Notification.<T>createOnError(e));
62-
}
56+
public static final class SubscriberIterator<T>
57+
extends Subscriber<Notification<? extends T>> implements Iterator<T> {
6358

64-
@Override
65-
public void onNext(Notification<? extends T> args) {
66-
notifications.offer(args);
67-
}
68-
});
59+
private final BlockingQueue<Notification<? extends T>> notifications;
60+
private Notification<? extends T> buf;
6961

70-
return new Iterator<T>() {
71-
private Notification<? extends T> buf;
62+
public SubscriberIterator() {
63+
this.notifications = new LinkedBlockingQueue<Notification<? extends T>>();
64+
this.buf = null;
65+
}
7266

73-
@Override
74-
public boolean hasNext() {
75-
if (buf == null) {
76-
buf = take();
77-
}
78-
if (buf.isOnError()) {
79-
throw Exceptions.propagate(buf.getThrowable());
80-
}
81-
return !buf.isOnCompleted();
67+
@Override
68+
public void onStart() {
69+
request(0);
70+
}
71+
72+
@Override
73+
public void onCompleted() {
74+
// ignore
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
notifications.offer(Notification.<T>createOnError(e));
80+
}
81+
82+
@Override
83+
public void onNext(Notification<? extends T> args) {
84+
notifications.offer(args);
85+
}
86+
87+
@Override
88+
public boolean hasNext() {
89+
if (buf == null) {
90+
request(1);
91+
buf = take();
92+
}
93+
if (buf.isOnError()) {
94+
throw Exceptions.propagate(buf.getThrowable());
8295
}
96+
return !buf.isOnCompleted();
97+
}
8398

84-
@Override
85-
public T next() {
86-
if (hasNext()) {
87-
T result = buf.getValue();
88-
buf = null;
89-
return result;
90-
}
91-
throw new NoSuchElementException();
99+
@Override
100+
public T next() {
101+
if (hasNext()) {
102+
T result = buf.getValue();
103+
buf = null;
104+
return result;
92105
}
106+
throw new NoSuchElementException();
107+
}
93108

94-
private Notification<? extends T> take() {
95-
try {
96-
Notification<? extends T> poll = notifications.poll();
97-
if (poll != null) {
98-
return poll;
99-
}
100-
return notifications.take();
101-
} catch (InterruptedException e) {
102-
subscription.unsubscribe();
103-
throw Exceptions.propagate(e);
109+
private Notification<? extends T> take() {
110+
try {
111+
Notification<? extends T> poll = notifications.poll();
112+
if (poll != null) {
113+
return poll;
104114
}
115+
return notifications.take();
116+
} catch (InterruptedException e) {
117+
unsubscribe();
118+
throw Exceptions.propagate(e);
105119
}
120+
}
106121

107-
@Override
108-
public void remove() {
109-
throw new UnsupportedOperationException("Read-only iterator");
110-
}
111-
};
122+
@Override
123+
public void remove() {
124+
throw new UnsupportedOperationException("Read-only iterator");
125+
}
112126
}
113-
114127
}

src/test/java/rx/internal/operators/BlockingOperatorToIteratorTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,4 +81,46 @@ public void call(Subscriber<? super String> subscriber) {
8181
System.out.println(string);
8282
}
8383
}
84+
85+
@Test
86+
public void testIteratorExertBackpressure() {
87+
final Counter src = new Counter();
88+
89+
Observable<Integer> obs = Observable.from(new Iterable<Integer>() {
90+
@Override
91+
public Iterator<Integer> iterator() {
92+
return src;
93+
}
94+
});
95+
96+
Iterator<Integer> it = toIterator(obs);
97+
while (it.hasNext()) {
98+
// Correct backpressure should cause this interleaved behavior.
99+
int i = it.next();
100+
assertEquals(i + 1, src.count);
101+
}
102+
}
103+
104+
public static final class Counter implements Iterator<Integer> {
105+
public int count;
106+
107+
public Counter() {
108+
this.count = 0;
109+
}
110+
111+
@Override
112+
public boolean hasNext() {
113+
return count < 5;
114+
}
115+
116+
@Override
117+
public Integer next() {
118+
return count++;
119+
}
120+
121+
@Override
122+
public void remove() {
123+
throw new UnsupportedOperationException();
124+
}
125+
}
84126
}

0 commit comments

Comments
 (0)