Skip to content

Commit 0d96903

Browse files
committed
Merge pull request #2455 from duncani/Issue#2191
Fix for #2191 - OperatorMulticast fails to unsubscribe from source
2 parents 833c6be + ce76ea9 commit 0d96903

File tree

2 files changed

+217
-6
lines changed

2 files changed

+217
-6
lines changed

src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void call() {
143143
subscriptionIsNull = subscription == null;
144144
}
145145
if (!subscriptionIsNull)
146-
source.unsafeSubscribe(subscription);
146+
source.subscribe(subscription);
147147
}
148148
}
149149
}

src/test/java/rx/internal/operators/OperatorReplayTest.java

Lines changed: 216 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,22 @@
1616
package rx.internal.operators;
1717

1818
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.inOrder;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.never;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
2419

2520
import java.util.concurrent.TimeUnit;
2621
import java.util.concurrent.atomic.AtomicInteger;
2722
import static org.junit.Assert.assertEquals;
23+
import static org.mockito.Matchers.notNull;
24+
import static org.mockito.Mockito.*;
25+
2826
import org.junit.Test;
2927
import org.mockito.InOrder;
3028

3129

3230
import rx.Observable;
3331
import rx.Observer;
32+
import rx.Scheduler;
33+
import rx.Scheduler.Worker;
34+
import rx.Subscription;
3435
import rx.functions.Action0;
3536
import rx.functions.Action1;
3637
import rx.functions.Func1;
@@ -465,4 +466,214 @@ public void call() {
465466
assertEquals(2, effectCounter.get());
466467
}
467468
}
469+
470+
471+
/**
472+
* test the basic expectation of OperatorMulticast via replay
473+
*/
474+
@Test
475+
public void testIssue2191_UnsubscribeSource() {
476+
// setup mocks
477+
Action1 sourceNext = mock(Action1.class);
478+
Action0 sourceCompleted = mock(Action0.class);
479+
Action0 sourceUnsubscribed = mock(Action0.class);
480+
Observer spiedSubscriberBeforeConnect = mock(Observer.class);
481+
Observer spiedSubscriberAfterConnect = mock(Observer.class);
482+
483+
// Observable under test
484+
Observable<Integer> source = Observable.just(1,2);
485+
486+
ConnectableObservable<Integer> replay = source
487+
.doOnNext(sourceNext)
488+
.doOnUnsubscribe(sourceUnsubscribed)
489+
.doOnCompleted(sourceCompleted)
490+
.replay();
491+
492+
replay.subscribe(spiedSubscriberBeforeConnect);
493+
replay.subscribe(spiedSubscriberBeforeConnect);
494+
replay.connect();
495+
replay.subscribe(spiedSubscriberAfterConnect);
496+
replay.subscribe(spiedSubscriberAfterConnect);
497+
498+
499+
// verify interactions
500+
verify(sourceNext, times(1)).call(1);
501+
verify(sourceNext, times(1)).call(2);
502+
verify(sourceCompleted, times(1)).call();
503+
verifyObserverMock(spiedSubscriberBeforeConnect, 2, 4);
504+
verifyObserverMock(spiedSubscriberAfterConnect, 2, 4);
505+
506+
verify(sourceUnsubscribed, times(1)).call();
507+
508+
verifyNoMoreInteractions(sourceNext);
509+
verifyNoMoreInteractions(sourceCompleted);
510+
verifyNoMoreInteractions(sourceUnsubscribed);
511+
verifyNoMoreInteractions(spiedSubscriberBeforeConnect);
512+
verifyNoMoreInteractions(spiedSubscriberAfterConnect);
513+
514+
}
515+
516+
/**
517+
* Specifically test interaction with a Scheduler with subscribeOn
518+
*
519+
* @throws Exception
520+
*/
521+
@Test
522+
public void testIssue2191_SchedulerUnsubscribe() throws Exception {
523+
// setup mocks
524+
Action1 sourceNext = mock(Action1.class);
525+
Action0 sourceCompleted = mock(Action0.class);
526+
Action0 sourceUnsubscribed = mock(Action0.class);
527+
final Scheduler mockScheduler = mock(Scheduler.class);
528+
final Subscription mockSubscription = mock(Subscription.class);
529+
Worker spiedWorker = workerSpy(mockSubscription);
530+
Observer mockObserverBeforeConnect = mock(Observer.class);
531+
Observer mockObserverAfterConnect = mock(Observer.class);
532+
533+
when(mockScheduler.createWorker()).thenReturn(spiedWorker);
534+
535+
// Observable under test
536+
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3)
537+
.doOnNext(sourceNext)
538+
.doOnUnsubscribe(sourceUnsubscribed)
539+
.doOnCompleted(sourceCompleted)
540+
.subscribeOn(mockScheduler).replay();
541+
542+
replay.subscribe(mockObserverBeforeConnect);
543+
replay.subscribe(mockObserverBeforeConnect);
544+
replay.connect();
545+
replay.subscribe(mockObserverAfterConnect);
546+
replay.subscribe(mockObserverAfterConnect);
547+
548+
// verify interactions
549+
verify(sourceNext, times(1)).call(1);
550+
verify(sourceNext, times(1)).call(2);
551+
verify(sourceNext, times(1)).call(3);
552+
verify(sourceCompleted, times(1)).call();
553+
verify(mockScheduler, times(1)).createWorker();
554+
verify(spiedWorker, times(1)).schedule((Action0)notNull());
555+
verifyObserverMock(mockObserverBeforeConnect, 2, 6);
556+
verifyObserverMock(mockObserverAfterConnect, 2, 6);
557+
558+
verify(spiedWorker, times(1)).unsubscribe();
559+
verify(sourceUnsubscribed, times(1)).call();
560+
561+
verifyNoMoreInteractions(sourceNext);
562+
verifyNoMoreInteractions(sourceCompleted);
563+
verifyNoMoreInteractions(sourceUnsubscribed);
564+
verifyNoMoreInteractions(spiedWorker);
565+
verifyNoMoreInteractions(mockSubscription);
566+
verifyNoMoreInteractions(mockScheduler);
567+
verifyNoMoreInteractions(mockObserverBeforeConnect);
568+
verifyNoMoreInteractions(mockObserverAfterConnect);
569+
}
570+
571+
/**
572+
* Specifically test interaction with a Scheduler with subscribeOn
573+
*
574+
* @throws Exception
575+
*/
576+
@Test
577+
public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception {
578+
// setup mocks
579+
Action1 sourceNext = mock(Action1.class);
580+
Action0 sourceCompleted = mock(Action0.class);
581+
Action1 sourceError = mock(Action1.class);
582+
Action0 sourceUnsubscribed = mock(Action0.class);
583+
final Scheduler mockScheduler = mock(Scheduler.class);
584+
final Subscription mockSubscription = mock(Subscription.class);
585+
Worker spiedWorker = workerSpy(mockSubscription);
586+
Observer mockObserverBeforeConnect = mock(Observer.class);
587+
Observer mockObserverAfterConnect = mock(Observer.class);
588+
589+
when(mockScheduler.createWorker()).thenReturn(spiedWorker);
590+
591+
// Observable under test
592+
Func1<Integer, Integer> mockFunc = mock(Func1.class);
593+
IllegalArgumentException illegalArgumentException = new IllegalArgumentException();
594+
when(mockFunc.call(1)).thenReturn(1);
595+
when(mockFunc.call(2)).thenThrow(illegalArgumentException);
596+
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3).map(mockFunc)
597+
.doOnNext(sourceNext)
598+
.doOnUnsubscribe(sourceUnsubscribed)
599+
.doOnCompleted(sourceCompleted)
600+
.doOnError(sourceError)
601+
.subscribeOn(mockScheduler).replay();
602+
603+
replay.subscribe(mockObserverBeforeConnect);
604+
replay.subscribe(mockObserverBeforeConnect);
605+
replay.connect();
606+
replay.subscribe(mockObserverAfterConnect);
607+
replay.subscribe(mockObserverAfterConnect);
608+
609+
// verify interactions
610+
verify(mockScheduler, times(1)).createWorker();
611+
verify(spiedWorker, times(1)).schedule((Action0)notNull());
612+
verify(sourceNext, times(1)).call(1);
613+
verify(sourceError, times(1)).call(illegalArgumentException);
614+
verifyObserver(mockObserverBeforeConnect, 2, 2, illegalArgumentException);
615+
verifyObserver(mockObserverAfterConnect, 2, 2, illegalArgumentException);
616+
617+
verify(spiedWorker, times(1)).unsubscribe();
618+
verify(sourceUnsubscribed, times(1)).call();
619+
620+
verifyNoMoreInteractions(sourceNext);
621+
verifyNoMoreInteractions(sourceCompleted);
622+
verifyNoMoreInteractions(sourceError);
623+
verifyNoMoreInteractions(sourceUnsubscribed);
624+
verifyNoMoreInteractions(spiedWorker);
625+
verifyNoMoreInteractions(mockSubscription);
626+
verifyNoMoreInteractions(mockScheduler);
627+
verifyNoMoreInteractions(mockObserverBeforeConnect);
628+
verifyNoMoreInteractions(mockObserverAfterConnect);
629+
}
630+
631+
private static void verifyObserverMock(Observer mock, int numSubscriptions, int numItemsExpected) {
632+
verify(mock, times(numItemsExpected)).onNext(notNull());
633+
verify(mock, times(numSubscriptions)).onCompleted();
634+
verifyNoMoreInteractions(mock);
635+
}
636+
637+
private static void verifyObserver(Observer mock, int numSubscriptions, int numItemsExpected, Throwable error) {
638+
verify(mock, times(numItemsExpected)).onNext(notNull());
639+
verify(mock, times(numSubscriptions)).onError(error);
640+
verifyNoMoreInteractions(mock);
641+
}
642+
643+
public static Worker workerSpy(final Subscription mockSubscription) {
644+
return spy(new InprocessWorker(mockSubscription));
645+
}
646+
647+
648+
private static class InprocessWorker extends Worker {
649+
private final Subscription mockSubscription;
650+
public boolean unsubscribed;
651+
652+
public InprocessWorker(Subscription mockSubscription) {
653+
this.mockSubscription = mockSubscription;
654+
}
655+
656+
@Override
657+
public Subscription schedule(Action0 action) {
658+
action.call();
659+
return mockSubscription; // this subscription is returned but discarded
660+
}
661+
662+
@Override
663+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
664+
action.call();
665+
return mockSubscription;
666+
}
667+
668+
@Override
669+
public void unsubscribe() {
670+
unsubscribed = true;
671+
}
672+
673+
@Override
674+
public boolean isUnsubscribed() {
675+
return unsubscribed;
676+
}
677+
}
678+
468679
}

0 commit comments

Comments
 (0)