Skip to content

Commit 9d4e1c9

Browse files
Update TestScheduler to match merged Scheduler changes
- I accidentally merged this code before fixing it to match the changes to AbstractScheduler/Scheduler
1 parent e0b4a68 commit 9d4e1c9

File tree

1 file changed

+43
-36
lines changed

1 file changed

+43
-36
lines changed

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,75 +20,82 @@
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
2222

23+
import rx.Scheduler;
2324
import rx.Subscription;
2425
import rx.subscriptions.Subscriptions;
25-
import rx.util.functions.Func0;
26+
import rx.util.functions.Func2;
2627

27-
public class TestScheduler extends AbstractScheduler {
28-
private final Queue<TimedAction> queue = new PriorityQueue<TimedAction>(11, new CompareActionsByTime());
29-
30-
private static class TimedAction {
28+
public class TestScheduler extends Scheduler {
29+
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());
30+
31+
private static class TimedAction<T> {
3132
private final long time;
32-
private final Func0<Subscription> action;
33+
private final Func2<Scheduler, T, Subscription> action;
34+
private final T state;
35+
private final TestScheduler scheduler;
3336

34-
private TimedAction(long time, Func0<Subscription> action) {
37+
private TimedAction(TestScheduler scheduler, long time, Func2<Scheduler, T, Subscription> action, T state) {
3538
this.time = time;
3639
this.action = action;
40+
this.state = state;
41+
this.scheduler = scheduler;
3742
}
38-
43+
3944
@Override
4045
public String toString() {
4146
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
4247
}
4348
}
44-
45-
private static class CompareActionsByTime implements Comparator<TimedAction> {
46-
@Override
47-
public int compare(TimedAction action1, TimedAction action2) {
48-
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
49-
}
49+
50+
private static class CompareActionsByTime implements Comparator<TimedAction<?>> {
51+
@Override
52+
public int compare(TimedAction<?> action1, TimedAction<?> action2) {
53+
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
54+
}
5055
}
51-
56+
5257
private long time;
53-
54-
@Override
55-
public Subscription schedule(Func0<Subscription> action) {
56-
return schedule(action, 0L, TimeUnit.NANOSECONDS);
57-
}
5858

59-
@Override
60-
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
61-
queue.add(new TimedAction(now() + unit.toNanos(dueTime), action));
62-
return Subscriptions.empty();
63-
}
64-
6559
@Override
6660
public long now() {
6761
return time;
6862
}
6963

70-
public void advanceTimeBy(long dueTime, TimeUnit unit) {
71-
advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS);
64+
public void advanceTimeBy(long delayTime, TimeUnit unit) {
65+
advanceTimeTo(time + unit.toNanos(delayTime), TimeUnit.NANOSECONDS);
7266
}
73-
74-
public void advanceTimeTo(long dueTime, TimeUnit unit) {
75-
long targetTime = unit.toNanos(dueTime);
67+
68+
public void advanceTimeTo(long delayTime, TimeUnit unit) {
69+
long targetTime = unit.toNanos(delayTime);
7670
triggerActions(targetTime);
7771
}
7872

7973
public void triggerActions() {
80-
triggerActions(time);
74+
triggerActions(time);
8175
}
82-
76+
77+
@SuppressWarnings("unchecked")
8378
private void triggerActions(long targetTimeInNanos) {
84-
while (! queue.isEmpty()) {
85-
TimedAction current = queue.peek();
79+
while (!queue.isEmpty()) {
80+
TimedAction<?> current = queue.peek();
8681
if (current.time > targetTimeInNanos) {
8782
break;
8883
}
8984
time = current.time;
9085
queue.remove();
91-
current.action.call();
86+
// because the queue can have wildcards we have to ignore the type T for the state
87+
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
9288
}
9389
}
90+
91+
@Override
92+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
93+
return schedule(state, action, 0, TimeUnit.MILLISECONDS);
94+
}
95+
96+
@Override
97+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
98+
queue.add(new TimedAction<T>(this, now() + unit.toNanos(delayTime), action, state));
99+
return Subscriptions.empty();
100+
}
94101
}

0 commit comments

Comments
 (0)