Skip to content

Commit 1dd023f

Browse files
yurgisbaykshtisakarnokd
authored andcommitted
fix for #5429 (#5430)
1 parent 433b099 commit 1dd023f

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

src/main/java/rx/observables/AsyncOnSubscribe.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -532,8 +532,11 @@ boolean tryEmit(long n) {
532532
onNextCalled = false;
533533
expectedDelivery = n;
534534
nextIteration(n);
535-
536-
if (hasTerminated || isUnsubscribed()) {
535+
536+
//hasTerminated will be true when onCompleted was already emitted from the request callback
537+
//even if the the observer has not seen onCompleted from the requested observable,
538+
//so we should not clean up while there are active subscriptions
539+
if (hasTerminated && !subscriptions.hasSubscriptions() || isUnsubscribed()) {
537540
cleanup();
538541
return true;
539542
}

src/test/java/rx/observables/AsyncOnSubscribeTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,4 +483,38 @@ public Integer call(Integer state, Long requested, Observer<Observable<? extends
483483

484484
subscriber.assertNotCompleted();
485485
}
486+
487+
@Test
488+
public void testMergeDelayedWithScalar() {
489+
final TestScheduler scheduler = new TestScheduler();
490+
Observable<Integer> os = Observable.create(AsyncOnSubscribe.<Integer, Integer> createStateful(
491+
new Func0<Integer>() {
492+
493+
@Override
494+
public Integer call() {
495+
return 0;
496+
}
497+
498+
},
499+
new Func3<Integer, Long, Observer<Observable<? extends Integer>>, Integer>() {
500+
501+
@Override
502+
public Integer call(Integer state, Long requested, Observer<Observable<? extends Integer>> emitter) {
503+
if (state == 0) {
504+
emitter.onNext(Observable.range(0,100).delay(1, TimeUnit.SECONDS, scheduler));
505+
} else {
506+
emitter.onCompleted();
507+
}
508+
return state + 1;
509+
}
510+
511+
}));
512+
513+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
514+
os.mergeWith(Observable.just(0)).subscribe(ts);
515+
scheduler.advanceTimeBy(1, TimeUnit.HOURS);
516+
ts.assertCompleted();
517+
ts.assertValueCount(101);
518+
}
519+
486520
}

0 commit comments

Comments
 (0)