15
15
*/
16
16
package rx .operators ;
17
17
18
- import static org .junit .Assert .*;
19
18
import static org .mockito .Matchers .*;
20
19
import static org .mockito .Mockito .*;
21
- import static rx .operators .Tester .UnitTest .*;
22
20
23
21
import java .util .concurrent .TimeUnit ;
22
+ import java .util .concurrent .atomic .AtomicBoolean ;
24
23
24
+ import org .junit .Before ;
25
25
import org .junit .Test ;
26
+ import org .mockito .InOrder ;
26
27
28
+ import rx .Observable ;
27
29
import rx .Observer ;
28
30
import rx .Scheduler ;
29
31
import rx .Subscription ;
30
32
import rx .concurrency .Schedulers ;
31
- import rx .util .functions .Func0 ;
33
+ import rx .concurrency .TestScheduler ;
34
+ import rx .subscriptions .Subscriptions ;
35
+ import rx .util .functions .Action0 ;
32
36
import rx .util .functions .Func1 ;
33
37
34
38
/**
@@ -57,6 +61,7 @@ private static class Interval implements Func1<Observer<Long>, Subscription> {
57
61
private final Scheduler scheduler ;
58
62
59
63
private long currentValue ;
64
+ private final AtomicBoolean complete = new AtomicBoolean ();
60
65
61
66
private Interval (long interval , TimeUnit unit , Scheduler scheduler ) {
62
67
this .interval = interval ;
@@ -66,18 +71,69 @@ private Interval(long interval, TimeUnit unit, Scheduler scheduler) {
66
71
67
72
@ Override
68
73
public Subscription call (final Observer <Long > observer ) {
69
- return scheduler .schedule (new Func0 <Subscription >() {
70
- @ Override
71
- public Subscription call () {
74
+ scheduler .schedule (new IntervalAction (observer ), interval , unit );
75
+ return Subscriptions .create (new Action0 () {
76
+ @ Override
77
+ public void call () {
78
+ complete .set (true );
79
+ }
80
+ });
81
+ }
82
+
83
+ private class IntervalAction implements Action0 {
84
+ private final Observer <Long > observer ;
85
+
86
+ private IntervalAction (Observer <Long > observer ) {
87
+ this .observer = observer ;
88
+ }
89
+
90
+ @ Override
91
+ public void call () {
92
+ if (complete .get ()) {
93
+ observer .onCompleted ();
94
+ } else {
72
95
observer .onNext (currentValue );
73
96
currentValue ++;
74
- return Interval . this . call ( observer );
97
+ scheduler . schedule ( this , interval , unit );
75
98
}
76
- }, interval , unit );
99
+ }
77
100
}
78
101
}
79
102
80
103
public static class UnitTest {
81
- // TODO
104
+ private TestScheduler scheduler ;
105
+ private Observer <Long > observer ;
106
+
107
+ @ Before
108
+ @ SuppressWarnings ("unchecked" ) // due to mocking
109
+ public void before () {
110
+ scheduler = new TestScheduler ();
111
+ observer = mock (Observer .class );
112
+ }
113
+
114
+ @ Test
115
+ public void testInterval () {
116
+ Observable <Long > w = Observable .create (OperationInterval .interval (1 , TimeUnit .SECONDS , scheduler ));
117
+ Subscription sub = w .subscribe (observer );
118
+
119
+ verify (observer , never ()).onNext (0L );
120
+ verify (observer , never ()).onCompleted ();
121
+ verify (observer , never ()).onError (any (Exception .class ));
122
+
123
+ scheduler .advanceTimeTo (2 , TimeUnit .SECONDS );
124
+
125
+ InOrder inOrder = inOrder (observer );
126
+ inOrder .verify (observer , times (1 )).onNext (0L );
127
+ inOrder .verify (observer , times (1 )).onNext (1L );
128
+ inOrder .verify (observer , never ()).onNext (2L );
129
+ verify (observer , never ()).onCompleted ();
130
+ verify (observer , never ()).onError (any (Exception .class ));
131
+
132
+ sub .unsubscribe ();
133
+ scheduler .advanceTimeTo (4 , TimeUnit .SECONDS );
134
+ verify (observer , never ()).onNext (2L );
135
+ verify (observer , times (1 )).onCompleted ();
136
+ verify (observer , never ()).onError (any (Exception .class ));
137
+ }
82
138
}
83
139
}
0 commit comments