Skip to content

Commit c5a4c37

Browse files
committed
Merge pull request #3528 from zsxwing/zipIter-delay
1.x: Avoid to call next when Iterator is drained
2 parents bbc85bf + 2fc3e98 commit c5a4c37

File tree

2 files changed

+31
-3
lines changed

2 files changed

+31
-3
lines changed

src/main/java/rx/internal/operators/OperatorZipIterable.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,30 @@ public Subscriber<? super T1> call(final Subscriber<? super R> subscriber) {
4646
return Subscribers.empty();
4747
}
4848
return new Subscriber<T1>(subscriber) {
49-
boolean once;
49+
boolean done;
5050
@Override
5151
public void onCompleted() {
52-
if (once) {
52+
if (done) {
5353
return;
5454
}
55-
once = true;
55+
done = true;
5656
subscriber.onCompleted();
5757
}
5858

5959
@Override
6060
public void onError(Throwable e) {
61+
if (done) {
62+
return;
63+
}
64+
done = true;
6165
subscriber.onError(e);
6266
}
6367

6468
@Override
6569
public void onNext(T1 t) {
70+
if (done) {
71+
return;
72+
}
6673
try {
6774
subscriber.onNext(zipFunction.call(t, iterator.next()));
6875
if (!iterator.hasNext()) {

src/test/java/rx/internal/operators/OperatorZipIterableTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
import java.util.Arrays;
2525
import java.util.Iterator;
26+
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicInteger;
2728
import static org.junit.Assert.assertEquals;
2829

@@ -37,6 +38,8 @@
3738
import rx.functions.Func1;
3839
import rx.functions.Func2;
3940
import rx.functions.Func3;
41+
import rx.observers.TestSubscriber;
42+
import rx.schedulers.TestScheduler;
4043
import rx.subjects.PublishSubject;
4144

4245
public class OperatorZipIterableTest {
@@ -378,4 +381,22 @@ public String call(Integer t1) {
378381

379382
assertEquals(2, squareStr.counter.get());
380383
}
384+
385+
@Test
386+
public void testZipIterableWithDelay() {
387+
TestScheduler scheduler = new TestScheduler();
388+
Observable<Integer> o = Observable.just(1, 2).zipWith(Arrays.asList(1), new Func2<Integer, Integer, Integer>() {
389+
@Override
390+
public Integer call(Integer v1, Integer v2) {
391+
return v1;
392+
}
393+
}).delay(500, TimeUnit.MILLISECONDS, scheduler);
394+
395+
TestSubscriber<Integer> subscriber = new TestSubscriber<Integer>();
396+
o.subscribe(subscriber);
397+
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
398+
subscriber.assertValue(1);
399+
subscriber.assertNoErrors();
400+
subscriber.assertCompleted();
401+
}
381402
}

0 commit comments

Comments
 (0)