Skip to content

Commit b3384e0

Browse files
committed
Merge pull request #2936 from jacek-marchwicki/fix-observer-bug
Fix TestSubject bug
2 parents fe66cc9 + da6ef51 commit b3384e0

File tree

2 files changed

+144
-17
lines changed

2 files changed

+144
-17
lines changed

src/main/java/rx/subjects/TestSubject.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.concurrent.TimeUnit;
19-
2018
import rx.Observer;
2119
import rx.Scheduler;
2220
import rx.functions.Action0;
@@ -25,6 +23,8 @@
2523
import rx.schedulers.TestScheduler;
2624
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2725

26+
import java.util.concurrent.TimeUnit;
27+
2828
/**
2929
* A variety of Subject that is useful for testing purposes. It operates on a {@link TestScheduler} and allows
3030
* you to precisely time emissions and notifications to the Subject's subscribers using relative virtual time
@@ -68,11 +68,11 @@ protected TestSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T>
6868
}
6969

7070
/**
71-
* Schedule a call to {@code onCompleted} at relative time of "now()" on TestScheduler.
71+
* Schedule a call to {@code onCompleted} on TestScheduler.
7272
*/
7373
@Override
7474
public void onCompleted() {
75-
onCompleted(innerScheduler.now());
75+
onCompleted(0);
7676
}
7777

7878
private void _onCompleted() {
@@ -86,26 +86,26 @@ private void _onCompleted() {
8686
/**
8787
* Schedule a call to {@code onCompleted} relative to "now()" +n milliseconds in the future.
8888
*
89-
* @param timeInMilliseconds
89+
* @param delayTime
9090
* the number of milliseconds in the future relative to "now()" at which to call {@code onCompleted}
9191
*/
92-
public void onCompleted(long timeInMilliseconds) {
92+
public void onCompleted(long delayTime) {
9393
innerScheduler.schedule(new Action0() {
9494

9595
@Override
9696
public void call() {
9797
_onCompleted();
9898
}
9999

100-
}, timeInMilliseconds, TimeUnit.MILLISECONDS);
100+
}, delayTime, TimeUnit.MILLISECONDS);
101101
}
102102

103103
/**
104-
* Schedule a call to {@code onError} at relative time of "now()" on TestScheduler.
104+
* Schedule a call to {@code onError} on TestScheduler.
105105
*/
106106
@Override
107107
public void onError(final Throwable e) {
108-
onError(e, innerScheduler.now());
108+
onError(e, 0);
109109
}
110110

111111
private void _onError(final Throwable e) {
@@ -121,26 +121,26 @@ private void _onError(final Throwable e) {
121121
*
122122
* @param e
123123
* the {@code Throwable} to pass to the {@code onError} method
124-
* @param timeInMilliseconds
124+
* @param dalayTime
125125
* the number of milliseconds in the future relative to "now()" at which to call {@code onError}
126126
*/
127-
public void onError(final Throwable e, long timeInMilliseconds) {
127+
public void onError(final Throwable e, long dalayTime) {
128128
innerScheduler.schedule(new Action0() {
129129

130130
@Override
131131
public void call() {
132132
_onError(e);
133133
}
134134

135-
}, timeInMilliseconds, TimeUnit.MILLISECONDS);
135+
}, dalayTime, TimeUnit.MILLISECONDS);
136136
}
137137

138138
/**
139-
* Schedule a call to {@code onNext} at relative time of "now()" on TestScheduler.
139+
* Schedule a call to {@code onNext} on TestScheduler.
140140
*/
141141
@Override
142142
public void onNext(T v) {
143-
onNext(v, innerScheduler.now());
143+
onNext(v, 0);
144144
}
145145

146146
private void _onNext(T v) {
@@ -154,18 +154,18 @@ private void _onNext(T v) {
154154
*
155155
* @param v
156156
* the item to emit
157-
* @param timeInMilliseconds
157+
* @param delayTime
158158
* the number of milliseconds in the future relative to "now()" at which to call {@code onNext}
159159
*/
160-
public void onNext(final T v, long timeInMilliseconds) {
160+
public void onNext(final T v, long delayTime) {
161161
innerScheduler.schedule(new Action0() {
162162

163163
@Override
164164
public void call() {
165165
_onNext(v);
166166
}
167167

168-
}, timeInMilliseconds, TimeUnit.MILLISECONDS);
168+
}, delayTime, TimeUnit.MILLISECONDS);
169169
}
170170

171171
@Override
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import org.junit.Test;
19+
import rx.Observer;
20+
import rx.schedulers.TestScheduler;
21+
22+
import java.io.IOException;
23+
import java.util.concurrent.TimeUnit;
24+
25+
import static org.mockito.Mockito.*;
26+
27+
public class TestSubjectTest {
28+
29+
@Test
30+
public void testObserverPropagateValueAfterTriggeringActions() {
31+
final TestScheduler scheduler = new TestScheduler();
32+
33+
final TestSubject<Integer> subject = TestSubject.create(scheduler);
34+
@SuppressWarnings("unchecked")
35+
Observer<Integer> observer = mock(Observer.class);
36+
subject.subscribe(observer);
37+
38+
subject.onNext(1);
39+
scheduler.triggerActions();
40+
41+
verify(observer, times(1)).onNext(1);
42+
}
43+
44+
@Test
45+
public void testObserverPropagateValueInFutureTimeAfterTriggeringActions() {
46+
final TestScheduler scheduler = new TestScheduler();
47+
scheduler.advanceTimeTo(100, TimeUnit.SECONDS);
48+
49+
final TestSubject<Integer> subject = TestSubject.create(scheduler);
50+
@SuppressWarnings("unchecked")
51+
Observer<Integer> observer = mock(Observer.class);
52+
subject.subscribe(observer);
53+
54+
subject.onNext(1);
55+
scheduler.triggerActions();
56+
57+
verify(observer, times(1)).onNext(1);
58+
}
59+
60+
61+
62+
@Test
63+
public void testObserverPropagateErrorAfterTriggeringActions() {
64+
final IOException e = new IOException();
65+
final TestScheduler scheduler = new TestScheduler();
66+
67+
final TestSubject<Integer> subject = TestSubject.create(scheduler);
68+
@SuppressWarnings("unchecked")
69+
Observer<Integer> observer = mock(Observer.class);
70+
subject.subscribe(observer);
71+
72+
subject.onError(e);
73+
scheduler.triggerActions();
74+
75+
verify(observer, times(1)).onError(e);
76+
}
77+
78+
@Test
79+
public void testObserverPropagateErrorInFutureTimeAfterTriggeringActions() {
80+
final IOException e = new IOException();
81+
final TestScheduler scheduler = new TestScheduler();
82+
scheduler.advanceTimeTo(100, TimeUnit.SECONDS);
83+
84+
final TestSubject<Integer> subject = TestSubject.create(scheduler);
85+
@SuppressWarnings("unchecked")
86+
Observer<Integer> observer = mock(Observer.class);
87+
subject.subscribe(observer);
88+
89+
subject.onError(e);
90+
scheduler.triggerActions();
91+
92+
verify(observer, times(1)).onError(e);
93+
}
94+
95+
96+
97+
@Test
98+
public void testObserverPropagateCompletedAfterTriggeringActions() {
99+
final TestScheduler scheduler = new TestScheduler();
100+
101+
final TestSubject<Integer> subject = TestSubject.create(scheduler);
102+
@SuppressWarnings("unchecked")
103+
Observer<Integer> observer = mock(Observer.class);
104+
subject.subscribe(observer);
105+
106+
subject.onCompleted();
107+
scheduler.triggerActions();
108+
109+
verify(observer, times(1)).onCompleted();
110+
}
111+
112+
@Test
113+
public void testObserverPropagateCompletedInFutureTimeAfterTriggeringActions() {
114+
final TestScheduler scheduler = new TestScheduler();
115+
scheduler.advanceTimeTo(100, TimeUnit.SECONDS);
116+
117+
final TestSubject<Integer> subject = TestSubject.create(scheduler);
118+
@SuppressWarnings("unchecked")
119+
Observer<Integer> observer = mock(Observer.class);
120+
subject.subscribe(observer);
121+
122+
subject.onCompleted();
123+
scheduler.triggerActions();
124+
125+
verify(observer, times(1)).onCompleted();
126+
}
127+
}

0 commit comments

Comments
 (0)