Skip to content

Commit 0d420d7

Browse files
committed
Add unit tests to PublishSubject
1 parent acfe92f commit 0d420d7

File tree

1 file changed

+181
-0
lines changed

1 file changed

+181
-0
lines changed

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,23 @@
1010

1111
import org.junit.Test;
1212

13+
import org.mockito.Mockito;
1314
import rx.Notification;
1415
import rx.Observable;
1516
import rx.Observer;
1617
import rx.Subscription;
18+
import rx.testing.UnsubscribeTester;
1719
import rx.util.AtomicObservableSubscription;
1820
import rx.util.SynchronizedObserver;
1921
import rx.util.functions.Action1;
22+
import rx.util.functions.Func0;
2023
import rx.util.functions.Func1;
2124

25+
import static org.mockito.Matchers.any;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
29+
2230
public class PublishSubject<T> extends Subject<T, T> {
2331
public static <T> PublishSubject<T> create() {
2432
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
@@ -122,5 +130,178 @@ public void unsubscribe() {
122130

123131
sub.unsubscribe();
124132
}
133+
134+
private final Exception testException = new Exception();
135+
136+
@Test
137+
public void testCompleted() {
138+
PublishSubject<Object> subject = PublishSubject.create();
139+
140+
Observer<String> aObserver = mock(Observer.class);
141+
subject.subscribe(aObserver);
142+
143+
subject.onNext("one");
144+
subject.onNext("two");
145+
subject.onNext("three");
146+
subject.onCompleted();
147+
148+
Observer<String> anotherObserver = mock(Observer.class);
149+
subject.subscribe(anotherObserver);
150+
151+
subject.onNext("four");
152+
subject.onCompleted();
153+
subject.onError(new Exception());
154+
155+
assertCompletedObserver(aObserver);
156+
// todo bug? assertNeverObserver(anotherObserver);
157+
}
158+
159+
private void assertCompletedObserver(Observer<String> aObserver)
160+
{
161+
verify(aObserver, times(1)).onNext("one");
162+
verify(aObserver, times(1)).onNext("two");
163+
verify(aObserver, times(1)).onNext("three");
164+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
165+
verify(aObserver, times(1)).onCompleted();
166+
}
167+
168+
private void assertNeverObserver(Observer<String> aObserver)
169+
{
170+
verify(aObserver, Mockito.never()).onNext(any(String.class));
171+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
172+
verify(aObserver, Mockito.never()).onCompleted();
173+
}
174+
175+
@Test
176+
public void testError() {
177+
PublishSubject<Object> subject = PublishSubject.create();
178+
179+
Observer<String> aObserver = mock(Observer.class);
180+
subject.subscribe(aObserver);
181+
182+
subject.onNext("one");
183+
subject.onNext("two");
184+
subject.onNext("three");
185+
subject.onError(testException);
186+
187+
Observer<String> anotherObserver = mock(Observer.class);
188+
subject.subscribe(anotherObserver);
189+
190+
subject.onNext("four");
191+
subject.onError(new Exception());
192+
subject.onCompleted();
193+
194+
assertErrorObserver(aObserver);
195+
// todo bug? assertNeverObserver(anotherObserver);
196+
}
197+
198+
private void assertErrorObserver(Observer<String> aObserver)
199+
{
200+
verify(aObserver, times(1)).onNext("one");
201+
verify(aObserver, times(1)).onNext("two");
202+
verify(aObserver, times(1)).onNext("three");
203+
verify(aObserver, times(1)).onError(testException);
204+
verify(aObserver, Mockito.never()).onCompleted();
205+
}
206+
207+
208+
@Test
209+
public void testSubscribeMidSequence() {
210+
PublishSubject<Object> subject = PublishSubject.create();
211+
212+
Observer<String> aObserver = mock(Observer.class);
213+
subject.subscribe(aObserver);
214+
215+
subject.onNext("one");
216+
subject.onNext("two");
217+
218+
assertObservedUntilTwo(aObserver);
219+
220+
Observer<String> anotherObserver = mock(Observer.class);
221+
subject.subscribe(anotherObserver);
222+
223+
subject.onNext("three");
224+
subject.onCompleted();
225+
226+
assertCompletedObserver(aObserver);
227+
assertCompletedStartingWithThreeObserver(anotherObserver);
228+
}
229+
230+
231+
private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver)
232+
{
233+
verify(aObserver, Mockito.never()).onNext("one");
234+
verify(aObserver, Mockito.never()).onNext("two");
235+
verify(aObserver, times(1)).onNext("three");
236+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
237+
verify(aObserver, times(1)).onCompleted();
238+
}
239+
240+
@Test
241+
public void testUnsubscribeFirstObserver() {
242+
PublishSubject<Object> subject = PublishSubject.create();
243+
244+
Observer<String> aObserver = mock(Observer.class);
245+
Subscription subscription = subject.subscribe(aObserver);
246+
247+
subject.onNext("one");
248+
subject.onNext("two");
249+
250+
subscription.unsubscribe();
251+
assertObservedUntilTwo(aObserver);
252+
253+
Observer<String> anotherObserver = mock(Observer.class);
254+
subject.subscribe(anotherObserver);
255+
256+
subject.onNext("three");
257+
subject.onCompleted();
258+
259+
assertObservedUntilTwo(aObserver);
260+
assertCompletedStartingWithThreeObserver(anotherObserver);
261+
}
262+
263+
private void assertObservedUntilTwo(Observer<String> aObserver)
264+
{
265+
verify(aObserver, times(1)).onNext("one");
266+
verify(aObserver, times(1)).onNext("two");
267+
verify(aObserver, Mockito.never()).onNext("three");
268+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
269+
verify(aObserver, Mockito.never()).onCompleted();
270+
}
271+
272+
@Test
273+
public void testUnsubscribe()
274+
{
275+
UnsubscribeTester.test(new Func0<PublishSubject<Object>>()
276+
{
277+
@Override
278+
public PublishSubject<Object> call()
279+
{
280+
return PublishSubject.create();
281+
}
282+
}, new Action1<PublishSubject<Object>>()
283+
{
284+
@Override
285+
public void call(PublishSubject<Object> PublishSubject)
286+
{
287+
PublishSubject.onCompleted();
288+
}
289+
}, new Action1<PublishSubject<Object>>()
290+
{
291+
@Override
292+
public void call(PublishSubject<Object> PublishSubject)
293+
{
294+
PublishSubject.onError(new Exception());
295+
}
296+
}, new Action1<PublishSubject<Object>>()
297+
{
298+
@Override
299+
public void call(PublishSubject<Object> PublishSubject)
300+
{
301+
PublishSubject.onNext("one");
302+
}
303+
}
304+
);
305+
}
125306
}
126307
}

0 commit comments

Comments
 (0)