Skip to content

Commit d0c961b

Browse files
Merge branch 'replay' of git://github.com/johngmyers/RxJava into pull-218-merge-replay
Conflicts: rxjava-core/src/main/java/rx/operators/OperationTake.java rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java rxjava-core/src/main/java/rx/subjects/Subject.java - moved UnsubscribeTester into a package private class as I'm still undecided on long-term placement of this and the OperatorTester class - merged DefaultSubject and PublishedSubject which were the same thing (lots more unit tests though from this pull request) - still not sure what the right name is for DefaultSubject/PublishSubject - renamed RepeatSubject to ReplaySubject to match .Net - tweaked unit tests with InOrder while reviewing them to understand behavior
2 parents 8a38834 + af383b5 commit d0c961b

File tree

7 files changed

+746
-8
lines changed

7 files changed

+746
-8
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public static class UnitTest {
6060
@SuppressWarnings("unchecked")
6161
public void testObserveOn() {
6262

63-
Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate()));
63+
Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate()));
6464

6565
Observer<Integer> observer = mock(Observer.class);
6666
Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);

rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public static class UnitTest {
8181
public void testSubscribeOn() {
8282
Observable<Integer> w = Observable.toObservable(1, 2, 3);
8383

84-
Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate()));
84+
Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate()));
8585

8686
Observer<Integer> observer = mock(Observer.class);
8787
Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer);

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import static org.junit.Assert.*;
1919
import static org.mockito.Matchers.*;
2020
import static org.mockito.Mockito.*;
21-
import static rx.operators.Tester.UnitTest.*;
21+
import static rx.operators.OperatorTester.UnitTest.*;
2222

2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.concurrent.atomic.AtomicInteger;

rxjava-core/src/main/java/rx/operators/Tester.java renamed to rxjava-core/src/main/java/rx/operators/OperatorTester.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.operators;
217

318
import static org.junit.Assert.*;
@@ -24,14 +39,18 @@
2439
/**
2540
* Common utility functions for testing operator implementations.
2641
*/
27-
/* package */class Tester {
42+
/* package */class OperatorTester {
2843
/*
2944
* This is purposefully package-only so it does not leak into the public API outside of this package.
3045
*
3146
* This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility.
47+
*
48+
* benjchristensen => I'm procrastinating the decision of where and how these types of classes (see rx.subjects.UnsubscribeTester) should exist.
49+
* If they are only for internal implementations then I don't want them as part of the API.
50+
* If they are truly useful for everyone to use then an "rx.testing" package may make sense.
3251
*/
3352

34-
private Tester() {
53+
private OperatorTester() {
3554
}
3655

3756
public static class UnitTest {

rxjava-core/src/main/java/rx/subjects/DefaultSubject.java

Lines changed: 186 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,6 +15,9 @@
1515
*/
1616
package rx.subjects;
1717

18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
1821
import java.util.ArrayList;
1922
import java.util.List;
2023
import java.util.concurrent.ConcurrentHashMap;
@@ -24,6 +27,7 @@
2427
import junit.framework.Assert;
2528

2629
import org.junit.Test;
30+
import org.mockito.Mockito;
2731

2832
import rx.Notification;
2933
import rx.Observable;
@@ -32,6 +36,7 @@
3236
import rx.util.AtomicObservableSubscription;
3337
import rx.util.SynchronizedObserver;
3438
import rx.util.functions.Action1;
39+
import rx.util.functions.Func0;
3540
import rx.util.functions.Func1;
3641

3742
public class DefaultSubject<T> extends Subject<T, T> {
@@ -137,5 +142,183 @@ public void unsubscribe() {
137142

138143
sub.unsubscribe();
139144
}
145+
146+
private final Exception testException = new Exception();
147+
148+
@Test
149+
public void testCompleted() {
150+
DefaultSubject<Object> subject = DefaultSubject.create();
151+
152+
@SuppressWarnings("unchecked")
153+
Observer<String> aObserver = mock(Observer.class);
154+
subject.subscribe(aObserver);
155+
156+
subject.onNext("one");
157+
subject.onNext("two");
158+
subject.onNext("three");
159+
subject.onCompleted();
160+
161+
@SuppressWarnings("unchecked")
162+
Observer<String> anotherObserver = mock(Observer.class);
163+
subject.subscribe(anotherObserver);
164+
165+
subject.onNext("four");
166+
subject.onCompleted();
167+
subject.onError(new Exception());
168+
169+
assertCompletedObserver(aObserver);
170+
// todo bug? assertNeverObserver(anotherObserver);
171+
}
172+
173+
private void assertCompletedObserver(Observer<String> aObserver)
174+
{
175+
verify(aObserver, times(1)).onNext("one");
176+
verify(aObserver, times(1)).onNext("two");
177+
verify(aObserver, times(1)).onNext("three");
178+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
179+
verify(aObserver, times(1)).onCompleted();
180+
}
181+
182+
private void assertNeverObserver(Observer<String> aObserver)
183+
{
184+
verify(aObserver, Mockito.never()).onNext(any(String.class));
185+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
186+
verify(aObserver, Mockito.never()).onCompleted();
187+
}
188+
189+
@Test
190+
public void testError() {
191+
DefaultSubject<Object> subject = DefaultSubject.create();
192+
193+
@SuppressWarnings("unchecked")
194+
Observer<String> aObserver = mock(Observer.class);
195+
subject.subscribe(aObserver);
196+
197+
subject.onNext("one");
198+
subject.onNext("two");
199+
subject.onNext("three");
200+
subject.onError(testException);
201+
202+
@SuppressWarnings("unchecked")
203+
Observer<String> anotherObserver = mock(Observer.class);
204+
subject.subscribe(anotherObserver);
205+
206+
subject.onNext("four");
207+
subject.onError(new Exception());
208+
subject.onCompleted();
209+
210+
assertErrorObserver(aObserver);
211+
// todo bug? assertNeverObserver(anotherObserver);
212+
}
213+
214+
private void assertErrorObserver(Observer<String> aObserver)
215+
{
216+
verify(aObserver, times(1)).onNext("one");
217+
verify(aObserver, times(1)).onNext("two");
218+
verify(aObserver, times(1)).onNext("three");
219+
verify(aObserver, times(1)).onError(testException);
220+
verify(aObserver, Mockito.never()).onCompleted();
221+
}
222+
223+
@Test
224+
public void testSubscribeMidSequence() {
225+
DefaultSubject<Object> subject = DefaultSubject.create();
226+
227+
@SuppressWarnings("unchecked")
228+
Observer<String> aObserver = mock(Observer.class);
229+
subject.subscribe(aObserver);
230+
231+
subject.onNext("one");
232+
subject.onNext("two");
233+
234+
assertObservedUntilTwo(aObserver);
235+
236+
@SuppressWarnings("unchecked")
237+
Observer<String> anotherObserver = mock(Observer.class);
238+
subject.subscribe(anotherObserver);
239+
240+
subject.onNext("three");
241+
subject.onCompleted();
242+
243+
assertCompletedObserver(aObserver);
244+
assertCompletedStartingWithThreeObserver(anotherObserver);
245+
}
246+
247+
private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver)
248+
{
249+
verify(aObserver, Mockito.never()).onNext("one");
250+
verify(aObserver, Mockito.never()).onNext("two");
251+
verify(aObserver, times(1)).onNext("three");
252+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
253+
verify(aObserver, times(1)).onCompleted();
254+
}
255+
256+
@Test
257+
public void testUnsubscribeFirstObserver() {
258+
DefaultSubject<Object> subject = DefaultSubject.create();
259+
260+
@SuppressWarnings("unchecked")
261+
Observer<String> aObserver = mock(Observer.class);
262+
Subscription subscription = subject.subscribe(aObserver);
263+
264+
subject.onNext("one");
265+
subject.onNext("two");
266+
267+
subscription.unsubscribe();
268+
assertObservedUntilTwo(aObserver);
269+
270+
@SuppressWarnings("unchecked")
271+
Observer<String> anotherObserver = mock(Observer.class);
272+
subject.subscribe(anotherObserver);
273+
274+
subject.onNext("three");
275+
subject.onCompleted();
276+
277+
assertObservedUntilTwo(aObserver);
278+
assertCompletedStartingWithThreeObserver(anotherObserver);
279+
}
280+
281+
private void assertObservedUntilTwo(Observer<String> aObserver)
282+
{
283+
verify(aObserver, times(1)).onNext("one");
284+
verify(aObserver, times(1)).onNext("two");
285+
verify(aObserver, Mockito.never()).onNext("three");
286+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
287+
verify(aObserver, Mockito.never()).onCompleted();
288+
}
289+
290+
@Test
291+
public void testUnsubscribe()
292+
{
293+
UnsubscribeTester.test(new Func0<DefaultSubject<Object>>()
294+
{
295+
@Override
296+
public DefaultSubject<Object> call()
297+
{
298+
return DefaultSubject.create();
299+
}
300+
}, new Action1<DefaultSubject<Object>>()
301+
{
302+
@Override
303+
public void call(DefaultSubject<Object> DefaultSubject)
304+
{
305+
DefaultSubject.onCompleted();
306+
}
307+
}, new Action1<DefaultSubject<Object>>()
308+
{
309+
@Override
310+
public void call(DefaultSubject<Object> DefaultSubject)
311+
{
312+
DefaultSubject.onError(new Exception());
313+
}
314+
}, new Action1<DefaultSubject<Object>>()
315+
{
316+
@Override
317+
public void call(DefaultSubject<Object> DefaultSubject)
318+
{
319+
DefaultSubject.onNext("one");
320+
}
321+
});
322+
}
140323
}
141324
}

0 commit comments

Comments
 (0)