Skip to content

Commit 74d82ea

Browse files
committed
Unit tests for multicast
1 parent ac9201d commit 74d82ea

File tree

1 file changed

+82
-0
lines changed

1 file changed

+82
-0
lines changed

rxjava-core/src/main/java/rx/operators/OperatorMulticast.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package rx.operators;
22

3+
import org.junit.Test;
34
import rx.Observable;
45
import rx.Observer;
56
import rx.Subscription;
67
import rx.observables.ConnectableObservable;
8+
import rx.subjects.DefaultSubject;
79
import rx.subjects.Subject;
810
import rx.util.functions.Func1;
911

12+
import static org.mockito.Mockito.*;
13+
1014
public class OperatorMulticast {
1115
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
1216
return new MulticastConnectableObservable<T ,R>(source, subject);
@@ -49,5 +53,83 @@ public void onNext(T args) {
4953

5054
}
5155

56+
public static class UnitTest {
57+
58+
@Test
59+
public void testMulticast() {
60+
Subscription s = mock(Subscription.class);
61+
TestObservable source = new TestObservable(s);
62+
63+
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
64+
DefaultSubject.<String>create());
65+
66+
Observer<String> observer = mock(Observer.class);
67+
multicasted.subscribe(observer);
68+
69+
source.sendOnNext("one");
70+
source.sendOnNext("two");
71+
72+
multicasted.connect();
73+
74+
source.sendOnNext("three");
75+
source.sendOnNext("four");
76+
source.sendOnCompleted();
77+
78+
verify(observer, never()).onNext("one");
79+
verify(observer, never()).onNext("two");
80+
verify(observer, times(1)).onNext("three");
81+
verify(observer, times(1)).onNext("four");
82+
verify(observer, times(1)).onCompleted();
83+
84+
}
85+
86+
87+
private static class TestObservable extends Observable<String> {
88+
89+
Observer<String> observer = new Observer<String>() {
90+
@Override
91+
public void onCompleted() {
92+
// Do nothing
93+
}
5294

95+
@Override
96+
public void onError(Exception e) {
97+
// Do nothing
98+
}
99+
100+
@Override
101+
public void onNext(String args) {
102+
// Do nothing
103+
}
104+
};
105+
Subscription s;
106+
107+
public TestObservable(Subscription s) {
108+
this.s = s;
109+
}
110+
111+
/* used to simulate subscription */
112+
public void sendOnCompleted() {
113+
observer.onCompleted();
114+
}
115+
116+
/* used to simulate subscription */
117+
public void sendOnNext(String value) {
118+
observer.onNext(value);
119+
}
120+
121+
/* used to simulate subscription */
122+
public void sendOnError(Exception e) {
123+
observer.onError(e);
124+
}
125+
126+
@Override
127+
public Subscription subscribe(final Observer<String> observer) {
128+
this.observer = observer;
129+
return s;
130+
}
131+
132+
}
133+
134+
}
53135
}

0 commit comments

Comments
 (0)