Skip to content

Commit 1f5ed40

Browse files
committed
Observable.from(iterable) should emit onCompleted even if none requested when iterable is empty
1 parent aefdebb commit 1f5ed40

File tree

2 files changed

+36
-7
lines changed

2 files changed

+36
-7
lines changed

src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@ public OnSubscribeFromIterable(Iterable<? extends T> iterable) {
4444
@Override
4545
public void call(final Subscriber<? super T> o) {
4646
final Iterator<? extends T> it = is.iterator();
47-
o.setProducer(new IterableProducer<T>(o, it));
47+
if (!it.hasNext() && !o.isUnsubscribed())
48+
o.onCompleted();
49+
else
50+
o.setProducer(new IterableProducer<T>(o, it));
4851
}
4952

5053
private static final class IterableProducer<T> implements Producer {
@@ -62,12 +65,11 @@ private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
6265

6366
@Override
6467
public void request(long n) {
65-
if (REQUESTED_UPDATER.get(this) == Long.MAX_VALUE) {
68+
if (requested == Long.MAX_VALUE) {
6669
// already started with fast-path
6770
return;
6871
}
69-
if (n == Long.MAX_VALUE) {
70-
REQUESTED_UPDATER.set(this, n);
72+
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
7173
// fast-path without backpressure
7274
while (it.hasNext()) {
7375
if (o.isUnsubscribed()) {
@@ -78,7 +80,7 @@ public void request(long n) {
7880
if (!o.isUnsubscribed()) {
7981
o.onCompleted();
8082
}
81-
} else if(n > 0) {
83+
} else if (n > 0) {
8284
// backpressure is requested
8385
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
8486
if (_c == 0) {

src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Iterator;
2828
import java.util.concurrent.CountDownLatch;
2929
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3031

3132
import org.junit.Test;
3233
import org.mockito.Mockito;
@@ -74,12 +75,12 @@ public Iterator<String> iterator() {
7475

7576
@Override
7677
public boolean hasNext() {
77-
return i++ < 3;
78+
return i < 3;
7879
}
7980

8081
@Override
8182
public String next() {
82-
return String.valueOf(i);
83+
return String.valueOf(++i);
8384
}
8485

8586
@Override
@@ -193,5 +194,31 @@ public void onNext(Integer t) {
193194
assertTrue(latch.await(10, TimeUnit.SECONDS));
194195
}
195196

197+
@Test
198+
public void testFromEmptyIterableWhenZeroRequestedShouldStillEmitOnCompletedEagerly() {
199+
final AtomicBoolean completed = new AtomicBoolean(false);
200+
Observable.from(Collections.emptyList()).subscribe(new Subscriber<Object>() {
201+
202+
@Override
203+
public void onStart() {
204+
request(0);
205+
}
206+
207+
@Override
208+
public void onCompleted() {
209+
completed.set(true);
210+
}
196211

212+
@Override
213+
public void onError(Throwable e) {
214+
215+
}
216+
217+
@Override
218+
public void onNext(Object t) {
219+
220+
}});
221+
assertTrue(completed.get());
222+
}
223+
197224
}

0 commit comments

Comments
 (0)