Skip to content

Commit 4f3fa5b

Browse files
committed
ensure iterator.hasNext is not called unnecessarily as per #3006
1 parent 2bf39a7 commit 4f3fa5b

File tree

2 files changed

+115
-17
lines changed

2 files changed

+115
-17
lines changed

src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -71,42 +71,52 @@ public void request(long n) {
7171
}
7272
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
7373
// fast-path without backpressure
74-
while (it.hasNext()) {
74+
75+
while (true) {
7576
if (o.isUnsubscribed()) {
7677
return;
78+
} else if (it.hasNext()) {
79+
o.onNext(it.next());
80+
} else if (!o.isUnsubscribed()) {
81+
o.onCompleted();
82+
return;
83+
} else {
84+
// is unsubscribed
85+
return;
7786
}
78-
o.onNext(it.next());
79-
}
80-
if (!o.isUnsubscribed()) {
81-
o.onCompleted();
8287
}
8388
} else if (n > 0) {
8489
// backpressure is requested
8590
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
8691
if (_c == 0) {
8792
while (true) {
8893
/*
89-
* This complicated logic is done to avoid touching the volatile `requested` value
90-
* during the loop itself. If it is touched during the loop the performance is impacted significantly.
94+
* This complicated logic is done to avoid touching the
95+
* volatile `requested` value during the loop itself. If
96+
* it is touched during the loop the performance is
97+
* impacted significantly.
9198
*/
9299
long r = requested;
93100
long numToEmit = r;
94-
while (it.hasNext() && --numToEmit >= 0) {
101+
while (true) {
95102
if (o.isUnsubscribed()) {
96103
return;
97-
}
98-
o.onNext(it.next());
99-
100-
}
101-
102-
if (!it.hasNext()) {
103-
if (!o.isUnsubscribed()) {
104+
} else if (it.hasNext()) {
105+
if (--numToEmit >= 0) {
106+
o.onNext(it.next());
107+
} else
108+
break;
109+
} else if (!o.isUnsubscribed()) {
104110
o.onCompleted();
111+
return;
112+
} else {
113+
// is unsubscribed
114+
return;
105115
}
106-
return;
107116
}
108117
if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
109-
// we're done emitting the number requested so return
118+
// we're done emitting the number requested so
119+
// return
110120
return;
111121
}
112122

src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertFalse;
1819
import static org.junit.Assert.assertTrue;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.mock;
@@ -29,6 +30,7 @@
2930
import java.util.concurrent.TimeUnit;
3031
import java.util.concurrent.atomic.AtomicBoolean;
3132

33+
import org.junit.Assert;
3234
import org.junit.Test;
3335
import org.mockito.Mockito;
3436

@@ -221,4 +223,90 @@ public void onNext(Object t) {
221223
assertTrue(completed.get());
222224
}
223225

226+
@Test
227+
public void testDoesNotCallIteratorHasNextMoreThanRequiredWithBackpressure() {
228+
final AtomicBoolean called = new AtomicBoolean(false);
229+
Iterable<Integer> iterable = new Iterable<Integer>() {
230+
231+
@Override
232+
public Iterator<Integer> iterator() {
233+
return new Iterator<Integer>() {
234+
235+
int count = 1;
236+
237+
@Override
238+
public boolean hasNext() {
239+
if (count > 1) {
240+
called.set(true);
241+
return false;
242+
} else
243+
return true;
244+
}
245+
246+
@Override
247+
public Integer next() {
248+
return count++;
249+
}
250+
251+
};
252+
}
253+
};
254+
Observable.from(iterable).take(1).subscribe();
255+
assertFalse(called.get());
256+
}
257+
258+
@Test
259+
public void testDoesNotCallIteratorHasNextMoreThanRequiredFastPath() {
260+
final AtomicBoolean called = new AtomicBoolean(false);
261+
Iterable<Integer> iterable = new Iterable<Integer>() {
262+
263+
@Override
264+
public Iterator<Integer> iterator() {
265+
return new Iterator<Integer>() {
266+
267+
@Override
268+
public void remove() {
269+
// ignore
270+
}
271+
272+
int count = 1;
273+
274+
@Override
275+
public boolean hasNext() {
276+
if (count > 1) {
277+
called.set(true);
278+
return false;
279+
} else
280+
return true;
281+
}
282+
283+
@Override
284+
public Integer next() {
285+
return count++;
286+
}
287+
288+
};
289+
}
290+
};
291+
Observable.from(iterable).subscribe(new Subscriber<Integer>() {
292+
293+
@Override
294+
public void onCompleted() {
295+
296+
}
297+
298+
@Override
299+
public void onError(Throwable e) {
300+
301+
}
302+
303+
@Override
304+
public void onNext(Integer t) {
305+
// unsubscribe on first emission
306+
unsubscribe();
307+
}
308+
});
309+
assertFalse(called.get());
310+
}
311+
224312
}

0 commit comments

Comments
 (0)