Skip to content

Commit 74e54ad

Browse files
author
Duncan Irvine
committed
Fix for #2191 - OperatorMulticast fails to unsubscribe from source.
1 parent b37c7ed commit 74e54ad

File tree

2 files changed

+250
-1
lines changed

2 files changed

+250
-1
lines changed

src/main/java/rx/internal/operators/OperatorMulticast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public void call() {
143143
subscriptionIsNull = subscription == null;
144144
}
145145
if (!subscriptionIsNull)
146-
source.unsafeSubscribe(subscription);
146+
source.subscribe(subscription);
147147
}
148148
}
149149
}
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package rx.internal.operators;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
9+
import rx.Observable;
10+
import rx.Scheduler;
11+
import rx.Scheduler.Worker;
12+
import rx.Subscriber;
13+
import rx.Subscription;
14+
import rx.functions.Action0;
15+
import rx.functions.Action1;
16+
import rx.functions.Func1;
17+
import rx.observables.ConnectableObservable;
18+
19+
public class OperatorMulticastTest {
20+
21+
/**
22+
* test the basic expectation of OperatorMulticast via replay
23+
*/
24+
@Test
25+
public void testIssue2191_UnsubscribeSource() {
26+
// setup mocks
27+
Action1 sourceNext = mock(Action1.class);
28+
Action0 sourceCompleted = mock(Action0.class);
29+
Action0 sourceUnsubscribed = mock(Action0.class);
30+
Subscriber spiedSubscriberBeforeConnect = subscriberSpy();
31+
Subscriber spiedSubscriberAfterConnect = subscriberSpy();
32+
33+
// Observable under test
34+
Observable<Integer> source = Observable.just(1,2);
35+
36+
ConnectableObservable<Integer> replay = source
37+
.doOnNext(sourceNext)
38+
.doOnUnsubscribe(sourceUnsubscribed)
39+
.doOnCompleted(sourceCompleted)
40+
.replay();
41+
42+
replay.subscribe(spiedSubscriberBeforeConnect);
43+
replay.subscribe(spiedSubscriberBeforeConnect);
44+
replay.connect();
45+
replay.subscribe(spiedSubscriberAfterConnect);
46+
replay.subscribe(spiedSubscriberAfterConnect);
47+
48+
49+
// verify interactions
50+
verify(sourceNext, times(1)).call(1);
51+
verify(sourceNext, times(1)).call(2);
52+
verify(sourceCompleted, times(1)).call();
53+
verifySubscriberSpy(spiedSubscriberBeforeConnect, 2, 4);
54+
verifySubscriberSpy(spiedSubscriberAfterConnect, 2, 4);
55+
56+
verify(sourceUnsubscribed, times(1)).call();
57+
58+
verifyNoMoreInteractions(sourceNext);
59+
verifyNoMoreInteractions(sourceCompleted);
60+
verifyNoMoreInteractions(sourceUnsubscribed);
61+
verifyNoMoreInteractions(spiedSubscriberBeforeConnect);
62+
verifyNoMoreInteractions(spiedSubscriberAfterConnect);
63+
64+
}
65+
66+
/**
67+
* Specifically test interaction with a Scheduler with subscribeOn
68+
*
69+
* @throws Exception
70+
*/
71+
@Test
72+
public void testIssue2191_SchedulerUnsubscribe() throws Exception {
73+
// setup mocks
74+
Action1 sourceNext = mock(Action1.class);
75+
Action0 sourceCompleted = mock(Action0.class);
76+
Action0 sourceUnsubscribed = mock(Action0.class);
77+
final Scheduler mockScheduler = mock(Scheduler.class);
78+
final Subscription mockSubscription = mock(Subscription.class);
79+
Worker spiedWorker = workerSpy(mockSubscription);
80+
Subscriber spiedSubscriberBeforeConnect = subscriberSpy();
81+
Subscriber spiedSubscriberAfterConnect = subscriberSpy();
82+
83+
when(mockScheduler.createWorker()).thenReturn(spiedWorker);
84+
85+
// Observable under test
86+
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3)
87+
.doOnNext(sourceNext)
88+
.doOnUnsubscribe(sourceUnsubscribed)
89+
.doOnCompleted(sourceCompleted)
90+
.subscribeOn(mockScheduler).replay();
91+
92+
replay.subscribe(spiedSubscriberBeforeConnect);
93+
replay.subscribe(spiedSubscriberBeforeConnect);
94+
replay.connect();
95+
replay.subscribe(spiedSubscriberAfterConnect);
96+
replay.subscribe(spiedSubscriberAfterConnect);
97+
98+
// verify interactions
99+
verify(sourceNext, times(1)).call(1);
100+
verify(sourceNext, times(1)).call(2);
101+
verify(sourceNext, times(1)).call(3);
102+
verify(sourceCompleted, times(1)).call();
103+
verify(mockScheduler, times(1)).createWorker();
104+
verify(spiedWorker, times(1)).schedule((Action0)notNull());
105+
verifySubscriberSpy(spiedSubscriberBeforeConnect, 2, 6);
106+
verifySubscriberSpy(spiedSubscriberAfterConnect, 2, 6);
107+
108+
verify(spiedWorker, times(1)).unsubscribe();
109+
verify(sourceUnsubscribed, times(1)).call();
110+
111+
verifyNoMoreInteractions(sourceNext);
112+
verifyNoMoreInteractions(sourceCompleted);
113+
verifyNoMoreInteractions(sourceUnsubscribed);
114+
verifyNoMoreInteractions(spiedWorker);
115+
verifyNoMoreInteractions(mockSubscription);
116+
verifyNoMoreInteractions(mockScheduler);
117+
verifyNoMoreInteractions(spiedSubscriberBeforeConnect);
118+
verifyNoMoreInteractions(spiedSubscriberAfterConnect);
119+
}
120+
121+
/**
122+
* Specifically test interaction with a Scheduler with subscribeOn
123+
*
124+
* @throws Exception
125+
*/
126+
@Test
127+
public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception {
128+
// setup mocks
129+
Action1 sourceNext = mock(Action1.class);
130+
Action0 sourceCompleted = mock(Action0.class);
131+
Action1 sourceError = mock(Action1.class);
132+
Action0 sourceUnsubscribed = mock(Action0.class);
133+
final Scheduler mockScheduler = mock(Scheduler.class);
134+
final Subscription mockSubscription = mock(Subscription.class);
135+
Worker spiedWorker = workerSpy(mockSubscription);
136+
Subscriber spiedSubscriberBeforeConnect = subscriberSpy();
137+
Subscriber spiedSubscriberAfterConnect = subscriberSpy();
138+
139+
when(mockScheduler.createWorker()).thenReturn(spiedWorker);
140+
141+
// Observable under test
142+
Func1<Integer, Integer> mockFunc = mock(Func1.class);
143+
IllegalArgumentException illegalArgumentException = new IllegalArgumentException();
144+
when(mockFunc.call(1)).thenReturn(1);
145+
when(mockFunc.call(2)).thenThrow(illegalArgumentException);
146+
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3).map(mockFunc)
147+
.doOnNext(sourceNext)
148+
.doOnUnsubscribe(sourceUnsubscribed)
149+
.doOnCompleted(sourceCompleted)
150+
.doOnError(sourceError)
151+
.subscribeOn(mockScheduler).replay();
152+
153+
replay.subscribe(spiedSubscriberBeforeConnect);
154+
replay.subscribe(spiedSubscriberBeforeConnect);
155+
replay.connect();
156+
replay.subscribe(spiedSubscriberAfterConnect);
157+
replay.subscribe(spiedSubscriberAfterConnect);
158+
159+
// verify interactions
160+
verify(mockScheduler, times(1)).createWorker();
161+
verify(spiedWorker, times(1)).schedule((Action0)notNull());
162+
verify(sourceNext, times(1)).call(1);
163+
verify(sourceError, times(1)).call(illegalArgumentException);
164+
verifySubscriberSpy(spiedSubscriberBeforeConnect, 2, 2, illegalArgumentException);
165+
verifySubscriberSpy(spiedSubscriberAfterConnect, 2, 2, illegalArgumentException);
166+
167+
verify(spiedWorker, times(1)).unsubscribe();
168+
verify(sourceUnsubscribed, times(1)).call();
169+
170+
verifyNoMoreInteractions(sourceNext);
171+
verifyNoMoreInteractions(sourceCompleted);
172+
verifyNoMoreInteractions(sourceError);
173+
verifyNoMoreInteractions(sourceUnsubscribed);
174+
verifyNoMoreInteractions(spiedWorker);
175+
verifyNoMoreInteractions(mockSubscription);
176+
verifyNoMoreInteractions(mockScheduler);
177+
verifyNoMoreInteractions(spiedSubscriberBeforeConnect);
178+
verifyNoMoreInteractions(spiedSubscriberAfterConnect);
179+
}
180+
181+
public static Subscriber subscriberSpy() {
182+
return spy(new EmptySubscriber());
183+
}
184+
185+
private void verifySubscriberSpy(Subscriber spiedSubscriber, int numSubscriptions, int numItemsExpected) {
186+
verify(spiedSubscriber, times(numSubscriptions)).onStart();
187+
verify(spiedSubscriber, times(numItemsExpected)).onNext(notNull());
188+
verify(spiedSubscriber, times(numSubscriptions)).onCompleted();
189+
verifyNoMoreInteractions(spiedSubscriber);
190+
}
191+
private void verifySubscriberSpy(Subscriber spiedSubscriber, int numSubscriptions, int numItemsExpected, Throwable error) {
192+
verify(spiedSubscriber, times(numSubscriptions)).onStart();
193+
verify(spiedSubscriber, times(numItemsExpected)).onNext(notNull());
194+
verify(spiedSubscriber, times(numSubscriptions)).onError(error);
195+
verifyNoMoreInteractions(spiedSubscriber);
196+
}
197+
198+
public static Worker workerSpy(final Subscription mockSubscription) {
199+
return spy(new InprocessWorker(mockSubscription));
200+
}
201+
202+
private static class EmptySubscriber extends Subscriber {
203+
@Override
204+
public void onCompleted() {
205+
206+
}
207+
208+
@Override
209+
public void onError(Throwable e) {
210+
211+
}
212+
213+
@Override
214+
public void onNext(Object o) {
215+
216+
}
217+
}
218+
219+
private static class InprocessWorker extends Worker {
220+
private final Subscription mockSubscription;
221+
public boolean unsubscribed;
222+
223+
public InprocessWorker(Subscription mockSubscription) {
224+
this.mockSubscription = mockSubscription;
225+
}
226+
227+
@Override
228+
public Subscription schedule(Action0 action) {
229+
action.call();
230+
return mockSubscription; // this subscription is returned but discarded
231+
}
232+
233+
@Override
234+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
235+
action.call();
236+
return mockSubscription;
237+
}
238+
239+
@Override
240+
public void unsubscribe() {
241+
unsubscribed = true;
242+
}
243+
244+
@Override
245+
public boolean isUnsubscribed() {
246+
return unsubscribed;
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)