Skip to content

Commit 5ec4f76

Browse files
authored
2.x: fix flatMapIterable appearing to be empty when fused (#5256)
1 parent ba5edc9 commit 5ec4f76

File tree

3 files changed

+64
-12
lines changed

3 files changed

+64
-12
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,10 @@ public void clear() {
411411
@Override
412412
public boolean isEmpty() {
413413
Iterator<? extends R> it = current;
414-
return (it != null && !it.hasNext()) || queue.isEmpty();
414+
if (it == null) {
415+
return queue.isEmpty();
416+
}
417+
return !it.hasNext();
415418
}
416419

417420
@Nullable

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterableTest.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -863,4 +863,55 @@ public void remove() {
863863

864864
ts.assertResult(1);
865865
}
866+
867+
@Test
868+
public void doubleShare() {
869+
Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
870+
Flowable.just(it, it)
871+
.flatMapIterable(Functions.<Iterable<Integer>>identity())
872+
.share()
873+
.share()
874+
.count()
875+
.test()
876+
.assertResult(600L);
877+
}
878+
879+
@Test
880+
public void multiShare() {
881+
Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
882+
for (int i = 0; i < 5; i++) {
883+
Flowable<Integer> f = Flowable.just(it, it)
884+
.flatMapIterable(Functions.<Iterable<Integer>>identity());
885+
886+
for (int j = 0; j < i; j++) {
887+
f = f.share();
888+
}
889+
890+
f
891+
.count()
892+
.test()
893+
.withTag("Share: " + i)
894+
.assertResult(600L);
895+
}
896+
}
897+
898+
@Test
899+
public void multiShareHidden() {
900+
Iterable<Integer> it = Flowable.range(1, 300).blockingIterable();
901+
for (int i = 0; i < 5; i++) {
902+
Flowable<Integer> f = Flowable.just(it, it)
903+
.flatMapIterable(Functions.<Iterable<Integer>>identity())
904+
.hide();
905+
906+
for (int j = 0; j < i; j++) {
907+
f = f.share();
908+
}
909+
910+
f
911+
.count()
912+
.test()
913+
.withTag("Share: " + i)
914+
.assertResult(600L);
915+
}
916+
}
866917
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableToListTest.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ public void onNextCancelRace() {
395395
for (int i = 0; i < 1000; i++) {
396396
final PublishProcessor<Integer> pp = PublishProcessor.create();
397397
final TestObserver<List<Integer>> ts = pp.toList().test();
398-
398+
399399
Runnable r1 = new Runnable() {
400400
@Override
401401
public void run() {
@@ -408,18 +408,17 @@ public void run() {
408408
ts.cancel();
409409
}
410410
};
411-
411+
412412
TestHelper.race(r1, r2);
413413
}
414-
415414
}
416415

417416
@Test
418417
public void onNextCancelRaceFlowable() {
419418
for (int i = 0; i < 1000; i++) {
420419
final PublishProcessor<Integer> pp = PublishProcessor.create();
421420
final TestSubscriber<List<Integer>> ts = pp.toList().toFlowable().test();
422-
421+
423422
Runnable r1 = new Runnable() {
424423
@Override
425424
public void run() {
@@ -432,20 +431,20 @@ public void run() {
432431
ts.cancel();
433432
}
434433
};
435-
434+
436435
TestHelper.race(r1, r2);
437436
}
438-
437+
439438
}
440439

441440
@Test
442441
public void onCompleteCancelRaceFlowable() {
443442
for (int i = 0; i < 1000; i++) {
444443
final PublishProcessor<Integer> pp = PublishProcessor.create();
445444
final TestSubscriber<List<Integer>> ts = pp.toList().toFlowable().test();
446-
445+
447446
pp.onNext(1);
448-
447+
449448
Runnable r1 = new Runnable() {
450449
@Override
451450
public void run() {
@@ -458,14 +457,13 @@ public void run() {
458457
ts.cancel();
459458
}
460459
};
461-
460+
462461
TestHelper.race(r1, r2);
463-
462+
464463
if (ts.valueCount() != 0) {
465464
ts.assertValue(Arrays.asList(1))
466465
.assertNoErrors();
467466
}
468467
}
469-
470468
}
471469
}

0 commit comments

Comments
 (0)