Skip to content

Commit 84fa62d

Browse files
author
jmhofer
committed
First attempt at "sample" operator. Should probably use "interval"
operator instead of an internal clock. Also still needs tests!
1 parent e119a88 commit 84fa62d

File tree

1 file changed

+147
-0
lines changed

1 file changed

+147
-0
lines changed
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
21+
import static rx.operators.Tester.UnitTest.*;
22+
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicBoolean;
25+
import java.util.concurrent.atomic.AtomicReference;
26+
27+
import org.junit.Test;
28+
29+
import rx.Observable;
30+
import rx.Observer;
31+
import rx.Scheduler;
32+
import rx.Subscription;
33+
import rx.concurrency.Schedulers;
34+
import rx.subscriptions.Subscriptions;
35+
import rx.util.functions.Action0;
36+
import rx.util.functions.Func0;
37+
import rx.util.functions.Func1;
38+
39+
/**
40+
* Samples the observable sequence at each interval.
41+
*/
42+
public final class OperationSample {
43+
44+
/**
45+
* Samples the observable sequence at each interval.
46+
*/
47+
public static <T> Func1<Observer<T>, Subscription> sample(final Observable<T> source, long interval, TimeUnit unit) {
48+
return new Sample<T>(source, interval, unit);
49+
}
50+
51+
private static class Sample<T> implements Func1<Observer<T>, Subscription> {
52+
private final Observable<T> source;
53+
private final long interval;
54+
private final TimeUnit unit;
55+
56+
private final AtomicBoolean hasValue = new AtomicBoolean();
57+
private final AtomicReference<T> latestValue = new AtomicReference<T>();
58+
private final AtomicBoolean sourceCompleted = new AtomicBoolean();
59+
60+
private Sample(Observable<T> source, long interval, TimeUnit unit) {
61+
this.source = source;
62+
this.interval = interval;
63+
this.unit = unit;
64+
}
65+
66+
@Override
67+
public Subscription call(final Observer<T> observer) {
68+
Clock clock = new Clock(Schedulers.currentThread(), interval, unit);
69+
final Subscription clockSubscription = Observable.create(clock).subscribe(new Observer<Long>() {
70+
@Override
71+
public void onCompleted() { /* the clock never completes */ }
72+
73+
@Override
74+
public void onError(Exception e) { /* the clock has no errors */ }
75+
76+
@Override
77+
public void onNext(Long totalTimePassed) {
78+
if (hasValue.get()) {
79+
observer.onNext(latestValue.get());
80+
}
81+
}
82+
});
83+
84+
Subscription sourceSubscription = source.subscribe(new Observer<T>() {
85+
@Override
86+
public void onCompleted() {
87+
clockSubscription.unsubscribe();
88+
sourceCompleted.set(true);
89+
observer.onCompleted();
90+
}
91+
92+
@Override
93+
public void onError(Exception e) {
94+
clockSubscription.unsubscribe();
95+
sourceCompleted.set(true);
96+
observer.onError(e);
97+
}
98+
99+
@Override
100+
public void onNext(T value) {
101+
latestValue.set(value);
102+
hasValue.set(true);
103+
}
104+
});
105+
106+
return clockSubscription;
107+
}
108+
109+
private class Clock implements Func1<Observer<Long>, Subscription> {
110+
private final Scheduler scheduler;
111+
private final long interval;
112+
private final TimeUnit unit;
113+
114+
private long timePassed;
115+
116+
private Clock(Scheduler scheduler, long interval, TimeUnit unit) {
117+
this.scheduler = scheduler;
118+
this.interval = interval;
119+
this.unit = unit;
120+
}
121+
122+
@Override
123+
public Subscription call(final Observer<Long> observer) {
124+
return scheduler.schedule(new Func0<Subscription>() {
125+
@Override
126+
public Subscription call() {
127+
if (! sourceCompleted.get()) {
128+
timePassed += interval;
129+
observer.onNext(timePassed);
130+
return Clock.this.call(observer);
131+
}
132+
return Subscriptions.create(new Action0() {
133+
@Override
134+
public void call() {
135+
// TODO Auto-generated method stub
136+
}
137+
});
138+
}
139+
}, interval, unit);
140+
}
141+
}
142+
}
143+
144+
public static class UnitTest {
145+
// TODO
146+
}
147+
}

0 commit comments

Comments
 (0)