Skip to content

Commit d00814e

Browse files
author
jmhofer
committed
Added a TestScheduler which collects actions in a queue and has
adjustable time.
1 parent 5ee54a0 commit d00814e

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.concurrency;
17+
18+
import java.util.Comparator;
19+
import java.util.PriorityQueue;
20+
import java.util.Queue;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import rx.Subscription;
24+
import rx.subscriptions.Subscriptions;
25+
import rx.util.functions.Func0;
26+
27+
public class TestScheduler extends AbstractScheduler {
28+
private final Queue<TimedAction> queue = new PriorityQueue<TimedAction>(11, new CompareActionsByTime());
29+
30+
private static class TimedAction {
31+
private final long time;
32+
private final Func0<Subscription> action;
33+
34+
private TimedAction(long time, Func0<Subscription> action) {
35+
this.time = time;
36+
this.action = action;
37+
}
38+
}
39+
40+
private static class CompareActionsByTime implements Comparator<TimedAction> {
41+
@Override
42+
public int compare(TimedAction action1, TimedAction action2) {
43+
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
44+
}
45+
}
46+
47+
private long time;
48+
49+
@Override
50+
public Subscription schedule(Func0<Subscription> action) {
51+
return schedule(action, 0L, TimeUnit.NANOSECONDS);
52+
}
53+
54+
@Override
55+
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
56+
queue.add(new TimedAction(now() + unit.toNanos(dueTime), action));
57+
return Subscriptions.empty();
58+
}
59+
60+
@Override
61+
public long now() {
62+
return time;
63+
}
64+
65+
public void advanceTimeBy(long dueTime, TimeUnit unit) {
66+
advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS);
67+
}
68+
69+
public void advanceTimeTo(long dueTime, TimeUnit unit) {
70+
long targetTime = unit.toNanos(dueTime);
71+
triggerActions(targetTime);
72+
}
73+
74+
public void triggerActions() {
75+
triggerActions(time);
76+
}
77+
78+
private void triggerActions(long targetTimeInNanos) {
79+
while (! queue.isEmpty()) {
80+
TimedAction current = queue.peek();
81+
if (current.time > targetTimeInNanos) {
82+
break;
83+
}
84+
time = current.time;
85+
queue.remove();
86+
current.action.call();
87+
}
88+
}
89+
}

0 commit comments

Comments
 (0)