20
20
import static org .mockito .Mockito .*;
21
21
import static rx .operators .Tester .UnitTest .*;
22
22
23
+ import java .util .concurrent .Executors ;
23
24
import java .util .concurrent .TimeUnit ;
24
25
import java .util .concurrent .atomic .AtomicBoolean ;
25
26
import java .util .concurrent .atomic .AtomicReference ;
33
34
import rx .concurrency .Schedulers ;
34
35
import rx .subscriptions .Subscriptions ;
35
36
import rx .util .functions .Action0 ;
36
- import rx .util .functions .Func0 ;
37
37
import rx .util .functions .Func1 ;
38
38
39
39
/**
@@ -45,54 +45,59 @@ public final class OperationSample {
45
45
* Samples the observable sequence at each interval.
46
46
*/
47
47
public static <T > Func1 <Observer <T >, Subscription > sample (final Observable <T > source , long interval , TimeUnit unit ) {
48
- return new Sample <T >(source , interval , unit );
48
+ return new Sample <T >(source , interval , unit , Schedulers . executor ( Executors . newSingleThreadScheduledExecutor ()) );
49
49
}
50
50
51
+ /**
52
+ * Samples the observable sequence at each interval.
53
+ */
54
+ public static <T > Func1 <Observer <T >, Subscription > sample (final Observable <T > source , long interval , TimeUnit unit , Scheduler scheduler ) {
55
+ return new Sample <T >(source , interval , unit , scheduler );
56
+ }
51
57
private static class Sample <T > implements Func1 <Observer <T >, Subscription > {
52
58
private final Observable <T > source ;
53
59
private final long interval ;
54
60
private final TimeUnit unit ;
61
+ private final Scheduler scheduler ;
55
62
56
63
private final AtomicBoolean hasValue = new AtomicBoolean ();
57
64
private final AtomicReference <T > latestValue = new AtomicReference <T >();
58
- private final AtomicBoolean sourceCompleted = new AtomicBoolean ();
59
65
60
- private Sample (Observable <T > source , long interval , TimeUnit unit ) {
66
+ private Sample (Observable <T > source , long interval , TimeUnit unit , Scheduler scheduler ) {
61
67
this .source = source ;
62
68
this .interval = interval ;
63
69
this .unit = unit ;
70
+ this .scheduler = scheduler ;
64
71
}
65
72
66
73
@ Override
67
74
public Subscription call (final Observer <T > observer ) {
68
- Clock clock = new Clock ( Schedulers . currentThread (), interval , unit );
69
- final Subscription clockSubscription = Observable . create ( clock ) .subscribe (new Observer <Long >() {
75
+ Observable < Long > clock = Observable . create ( OperationInterval . interval ( interval , unit , scheduler ) );
76
+ final Subscription clockSubscription = clock .subscribe (new Observer <Long >() {
70
77
@ Override
71
78
public void onCompleted () { /* the clock never completes */ }
72
79
73
80
@ Override
74
81
public void onError (Exception e ) { /* the clock has no errors */ }
75
82
76
83
@ Override
77
- public void onNext (Long totalTimePassed ) {
84
+ public void onNext (@ SuppressWarnings ( "unused" ) Long tick ) {
78
85
if (hasValue .get ()) {
79
86
observer .onNext (latestValue .get ());
80
87
}
81
88
}
82
89
});
83
90
84
- Subscription sourceSubscription = source .subscribe (new Observer <T >() {
91
+ final Subscription sourceSubscription = source .subscribe (new Observer <T >() {
85
92
@ Override
86
93
public void onCompleted () {
87
94
clockSubscription .unsubscribe ();
88
- sourceCompleted .set (true );
89
95
observer .onCompleted ();
90
96
}
91
97
92
98
@ Override
93
99
public void onError (Exception e ) {
94
100
clockSubscription .unsubscribe ();
95
- sourceCompleted .set (true );
96
101
observer .onError (e );
97
102
}
98
103
@@ -103,41 +108,13 @@ public void onNext(T value) {
103
108
}
104
109
});
105
110
106
- return clockSubscription ;
107
- }
108
-
109
- private class Clock implements Func1 <Observer <Long >, Subscription > {
110
- private final Scheduler scheduler ;
111
- private final long interval ;
112
- private final TimeUnit unit ;
113
-
114
- private long timePassed ;
115
-
116
- private Clock (Scheduler scheduler , long interval , TimeUnit unit ) {
117
- this .scheduler = scheduler ;
118
- this .interval = interval ;
119
- this .unit = unit ;
120
- }
121
-
122
- @ Override
123
- public Subscription call (final Observer <Long > observer ) {
124
- return scheduler .schedule (new Func0 <Subscription >() {
125
- @ Override
126
- public Subscription call () {
127
- if (! sourceCompleted .get ()) {
128
- timePassed += interval ;
129
- observer .onNext (timePassed );
130
- return Clock .this .call (observer );
131
- }
132
- return Subscriptions .create (new Action0 () {
133
- @ Override
134
- public void call () {
135
- // TODO Auto-generated method stub
136
- }
137
- });
138
- }
139
- }, interval , unit );
140
- }
111
+ return Subscriptions .create (new Action0 () {
112
+ @ Override
113
+ public void call () {
114
+ clockSubscription .unsubscribe ();
115
+ sourceSubscription .unsubscribe ();
116
+ }
117
+ });
141
118
}
142
119
}
143
120
0 commit comments