Skip to content

ReplaySubject - Manual Merge of Pull 218 #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static class UnitTest {
@SuppressWarnings("unchecked")
public void testObserveOn() {

Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate()));
Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate()));

Observer<Integer> observer = mock(Observer.class);
Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public static class UnitTest {
public void testSubscribeOn() {
Observable<Integer> w = Observable.toObservable(1, 2, 3);

Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate()));
Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate()));

Observer<Integer> observer = mock(Observer.class);
Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer);
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static org.junit.Assert.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
import static rx.operators.Tester.UnitTest.*;
import static rx.operators.OperatorTester.UnitTest.*;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.junit.Assert.*;
Expand All @@ -24,14 +39,18 @@
/**
* Common utility functions for testing operator implementations.
*/
/* package */class Tester {
/* package */class OperatorTester {
/*
* This is purposefully package-only so it does not leak into the public API outside of this package.
*
* This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility.
*
* benjchristensen => I'm procrastinating the decision of where and how these types of classes (see rx.subjects.UnsubscribeTester) should exist.
* If they are only for internal implementations then I don't want them as part of the API.
* If they are truly useful for everyone to use then an "rx.testing" package may make sense.
*/

private Tester() {
private OperatorTester() {
}

public static class UnitTest {
Expand Down
189 changes: 186 additions & 3 deletions rxjava-core/src/main/java/rx/subjects/DefaultSubject.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,6 +15,9 @@
*/
package rx.subjects;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -24,6 +27,7 @@
import junit.framework.Assert;

import org.junit.Test;
import org.mockito.Mockito;

import rx.Notification;
import rx.Observable;
Expand All @@ -32,6 +36,7 @@
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Action1;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public class DefaultSubject<T> extends Subject<T, T> {
Expand Down Expand Up @@ -137,5 +142,183 @@ public void unsubscribe() {

sub.unsubscribe();
}

private final Exception testException = new Exception();

@Test
public void testCompleted() {
DefaultSubject<Object> subject = DefaultSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
subject.subscribe(aObserver);

subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onCompleted();

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

subject.onNext("four");
subject.onCompleted();
subject.onError(new Exception());

assertCompletedObserver(aObserver);
// todo bug? assertNeverObserver(anotherObserver);
}

private void assertCompletedObserver(Observer<String> aObserver)
{
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

private void assertNeverObserver(Observer<String> aObserver)
{
verify(aObserver, Mockito.never()).onNext(any(String.class));
verify(aObserver, Mockito.never()).onError(any(Exception.class));
verify(aObserver, Mockito.never()).onCompleted();
}

@Test
public void testError() {
DefaultSubject<Object> subject = DefaultSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
subject.subscribe(aObserver);

subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onError(testException);

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

subject.onNext("four");
subject.onError(new Exception());
subject.onCompleted();

assertErrorObserver(aObserver);
// todo bug? assertNeverObserver(anotherObserver);
}

private void assertErrorObserver(Observer<String> aObserver)
{
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, times(1)).onError(testException);
verify(aObserver, Mockito.never()).onCompleted();
}

@Test
public void testSubscribeMidSequence() {
DefaultSubject<Object> subject = DefaultSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
subject.subscribe(aObserver);

subject.onNext("one");
subject.onNext("two");

assertObservedUntilTwo(aObserver);

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

subject.onNext("three");
subject.onCompleted();

assertCompletedObserver(aObserver);
assertCompletedStartingWithThreeObserver(anotherObserver);
}

private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver)
{
verify(aObserver, Mockito.never()).onNext("one");
verify(aObserver, Mockito.never()).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Exception.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testUnsubscribeFirstObserver() {
DefaultSubject<Object> subject = DefaultSubject.create();

@SuppressWarnings("unchecked")
Observer<String> aObserver = mock(Observer.class);
Subscription subscription = subject.subscribe(aObserver);

subject.onNext("one");
subject.onNext("two");

subscription.unsubscribe();
assertObservedUntilTwo(aObserver);

@SuppressWarnings("unchecked")
Observer<String> anotherObserver = mock(Observer.class);
subject.subscribe(anotherObserver);

subject.onNext("three");
subject.onCompleted();

assertObservedUntilTwo(aObserver);
assertCompletedStartingWithThreeObserver(anotherObserver);
}

private void assertObservedUntilTwo(Observer<String> aObserver)
{
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, Mockito.never()).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Exception.class));
verify(aObserver, Mockito.never()).onCompleted();
}

@Test
public void testUnsubscribe()
{
UnsubscribeTester.test(new Func0<DefaultSubject<Object>>()
{
@Override
public DefaultSubject<Object> call()
{
return DefaultSubject.create();
}
}, new Action1<DefaultSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
{
DefaultSubject.onCompleted();
}
}, new Action1<DefaultSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
{
DefaultSubject.onError(new Exception());
}
}, new Action1<DefaultSubject<Object>>()
{
@Override
public void call(DefaultSubject<Object> DefaultSubject)
{
DefaultSubject.onNext("one");
}
});
}
}
}
Loading