Skip to content

Fix for #2191 - OperatorMulticast fails to unsubscribe from source #2455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void call() {
subscriptionIsNull = subscription == null;
}
if (!subscriptionIsNull)
source.unsafeSubscribe(subscription);
source.subscribe(subscription);
}
}
}
221 changes: 216 additions & 5 deletions src/test/java/rx/internal/operators/OperatorReplayTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,22 @@
package rx.internal.operators;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.*;

import org.junit.Test;
import org.mockito.InOrder;


import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
Expand Down Expand Up @@ -465,4 +466,214 @@ public void call() {
assertEquals(2, effectCounter.get());
}
}


/**
* test the basic expectation of OperatorMulticast via replay
*/
@Test
public void testIssue2191_UnsubscribeSource() {
// setup mocks
Action1 sourceNext = mock(Action1.class);
Action0 sourceCompleted = mock(Action0.class);
Action0 sourceUnsubscribed = mock(Action0.class);
Observer spiedSubscriberBeforeConnect = mock(Observer.class);
Observer spiedSubscriberAfterConnect = mock(Observer.class);

// Observable under test
Observable<Integer> source = Observable.just(1,2);

ConnectableObservable<Integer> replay = source
.doOnNext(sourceNext)
.doOnUnsubscribe(sourceUnsubscribed)
.doOnCompleted(sourceCompleted)
.replay();

replay.subscribe(spiedSubscriberBeforeConnect);
replay.subscribe(spiedSubscriberBeforeConnect);
replay.connect();
replay.subscribe(spiedSubscriberAfterConnect);
replay.subscribe(spiedSubscriberAfterConnect);


// verify interactions
verify(sourceNext, times(1)).call(1);
verify(sourceNext, times(1)).call(2);
verify(sourceCompleted, times(1)).call();
verifyObserverMock(spiedSubscriberBeforeConnect, 2, 4);
verifyObserverMock(spiedSubscriberAfterConnect, 2, 4);

verify(sourceUnsubscribed, times(1)).call();

verifyNoMoreInteractions(sourceNext);
verifyNoMoreInteractions(sourceCompleted);
verifyNoMoreInteractions(sourceUnsubscribed);
verifyNoMoreInteractions(spiedSubscriberBeforeConnect);
verifyNoMoreInteractions(spiedSubscriberAfterConnect);

}

/**
* Specifically test interaction with a Scheduler with subscribeOn
*
* @throws Exception
*/
@Test
public void testIssue2191_SchedulerUnsubscribe() throws Exception {
// setup mocks
Action1 sourceNext = mock(Action1.class);
Action0 sourceCompleted = mock(Action0.class);
Action0 sourceUnsubscribed = mock(Action0.class);
final Scheduler mockScheduler = mock(Scheduler.class);
final Subscription mockSubscription = mock(Subscription.class);
Worker spiedWorker = workerSpy(mockSubscription);
Observer mockObserverBeforeConnect = mock(Observer.class);
Observer mockObserverAfterConnect = mock(Observer.class);

when(mockScheduler.createWorker()).thenReturn(spiedWorker);

// Observable under test
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3)
.doOnNext(sourceNext)
.doOnUnsubscribe(sourceUnsubscribed)
.doOnCompleted(sourceCompleted)
.subscribeOn(mockScheduler).replay();

replay.subscribe(mockObserverBeforeConnect);
replay.subscribe(mockObserverBeforeConnect);
replay.connect();
replay.subscribe(mockObserverAfterConnect);
replay.subscribe(mockObserverAfterConnect);

// verify interactions
verify(sourceNext, times(1)).call(1);
verify(sourceNext, times(1)).call(2);
verify(sourceNext, times(1)).call(3);
verify(sourceCompleted, times(1)).call();
verify(mockScheduler, times(1)).createWorker();
verify(spiedWorker, times(1)).schedule((Action0)notNull());
verifyObserverMock(mockObserverBeforeConnect, 2, 6);
verifyObserverMock(mockObserverAfterConnect, 2, 6);

verify(spiedWorker, times(1)).unsubscribe();
verify(sourceUnsubscribed, times(1)).call();

verifyNoMoreInteractions(sourceNext);
verifyNoMoreInteractions(sourceCompleted);
verifyNoMoreInteractions(sourceUnsubscribed);
verifyNoMoreInteractions(spiedWorker);
verifyNoMoreInteractions(mockSubscription);
verifyNoMoreInteractions(mockScheduler);
verifyNoMoreInteractions(mockObserverBeforeConnect);
verifyNoMoreInteractions(mockObserverAfterConnect);
}

/**
* Specifically test interaction with a Scheduler with subscribeOn
*
* @throws Exception
*/
@Test
public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception {
// setup mocks
Action1 sourceNext = mock(Action1.class);
Action0 sourceCompleted = mock(Action0.class);
Action1 sourceError = mock(Action1.class);
Action0 sourceUnsubscribed = mock(Action0.class);
final Scheduler mockScheduler = mock(Scheduler.class);
final Subscription mockSubscription = mock(Subscription.class);
Worker spiedWorker = workerSpy(mockSubscription);
Observer mockObserverBeforeConnect = mock(Observer.class);
Observer mockObserverAfterConnect = mock(Observer.class);

when(mockScheduler.createWorker()).thenReturn(spiedWorker);

// Observable under test
Func1<Integer, Integer> mockFunc = mock(Func1.class);
IllegalArgumentException illegalArgumentException = new IllegalArgumentException();
when(mockFunc.call(1)).thenReturn(1);
when(mockFunc.call(2)).thenThrow(illegalArgumentException);
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3).map(mockFunc)
.doOnNext(sourceNext)
.doOnUnsubscribe(sourceUnsubscribed)
.doOnCompleted(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();

replay.subscribe(mockObserverBeforeConnect);
replay.subscribe(mockObserverBeforeConnect);
replay.connect();
replay.subscribe(mockObserverAfterConnect);
replay.subscribe(mockObserverAfterConnect);

// verify interactions
verify(mockScheduler, times(1)).createWorker();
verify(spiedWorker, times(1)).schedule((Action0)notNull());
verify(sourceNext, times(1)).call(1);
verify(sourceError, times(1)).call(illegalArgumentException);
verifyObserver(mockObserverBeforeConnect, 2, 2, illegalArgumentException);
verifyObserver(mockObserverAfterConnect, 2, 2, illegalArgumentException);

verify(spiedWorker, times(1)).unsubscribe();
verify(sourceUnsubscribed, times(1)).call();

verifyNoMoreInteractions(sourceNext);
verifyNoMoreInteractions(sourceCompleted);
verifyNoMoreInteractions(sourceError);
verifyNoMoreInteractions(sourceUnsubscribed);
verifyNoMoreInteractions(spiedWorker);
verifyNoMoreInteractions(mockSubscription);
verifyNoMoreInteractions(mockScheduler);
verifyNoMoreInteractions(mockObserverBeforeConnect);
verifyNoMoreInteractions(mockObserverAfterConnect);
}

private static void verifyObserverMock(Observer mock, int numSubscriptions, int numItemsExpected) {
verify(mock, times(numItemsExpected)).onNext(notNull());
verify(mock, times(numSubscriptions)).onCompleted();
verifyNoMoreInteractions(mock);
}

private static void verifyObserver(Observer mock, int numSubscriptions, int numItemsExpected, Throwable error) {
verify(mock, times(numItemsExpected)).onNext(notNull());
verify(mock, times(numSubscriptions)).onError(error);
verifyNoMoreInteractions(mock);
}

public static Worker workerSpy(final Subscription mockSubscription) {
return spy(new InprocessWorker(mockSubscription));
}


private static class InprocessWorker extends Worker {
private final Subscription mockSubscription;
public boolean unsubscribed;

public InprocessWorker(Subscription mockSubscription) {
this.mockSubscription = mockSubscription;
}

@Override
public Subscription schedule(Action0 action) {
action.call();
return mockSubscription; // this subscription is returned but discarded
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
action.call();
return mockSubscription;
}

@Override
public void unsubscribe() {
unsubscribed = true;
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
}

}