Skip to content

Update TestScheduler to match merged Scheduler changes #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 18, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 43 additions & 36 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,75 +20,82 @@
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func0;
import rx.util.functions.Func2;

public class TestScheduler extends AbstractScheduler {
private final Queue<TimedAction> queue = new PriorityQueue<TimedAction>(11, new CompareActionsByTime());
private static class TimedAction {
public class TestScheduler extends Scheduler {
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());

private static class TimedAction<T> {
private final long time;
private final Func0<Subscription> action;
private final Func2<Scheduler, T, Subscription> action;
private final T state;
private final TestScheduler scheduler;

private TimedAction(long time, Func0<Subscription> action) {
private TimedAction(TestScheduler scheduler, long time, Func2<Scheduler, T, Subscription> action, T state) {
this.time = time;
this.action = action;
this.state = state;
this.scheduler = scheduler;
}

@Override
public String toString() {
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
}
}
private static class CompareActionsByTime implements Comparator<TimedAction> {
@Override
public int compare(TimedAction action1, TimedAction action2) {
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
}

private static class CompareActionsByTime implements Comparator<TimedAction<?>> {
@Override
public int compare(TimedAction<?> action1, TimedAction<?> action2) {
return Long.valueOf(action1.time).compareTo(Long.valueOf(action2.time));
}
}

private long time;

@Override
public Subscription schedule(Func0<Subscription> action) {
return schedule(action, 0L, TimeUnit.NANOSECONDS);
}

@Override
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
queue.add(new TimedAction(now() + unit.toNanos(dueTime), action));
return Subscriptions.empty();
}

@Override
public long now() {
return time;
}

public void advanceTimeBy(long dueTime, TimeUnit unit) {
advanceTimeTo(time + unit.toNanos(dueTime), TimeUnit.NANOSECONDS);
public void advanceTimeBy(long delayTime, TimeUnit unit) {
advanceTimeTo(time + unit.toNanos(delayTime), TimeUnit.NANOSECONDS);
}
public void advanceTimeTo(long dueTime, TimeUnit unit) {
long targetTime = unit.toNanos(dueTime);

public void advanceTimeTo(long delayTime, TimeUnit unit) {
long targetTime = unit.toNanos(delayTime);
triggerActions(targetTime);
}

public void triggerActions() {
triggerActions(time);
triggerActions(time);
}


@SuppressWarnings("unchecked")
private void triggerActions(long targetTimeInNanos) {
while (! queue.isEmpty()) {
TimedAction current = queue.peek();
while (!queue.isEmpty()) {
TimedAction<?> current = queue.peek();
if (current.time > targetTimeInNanos) {
break;
}
time = current.time;
queue.remove();
current.action.call();
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
}
}

@Override
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
return schedule(state, action, 0, TimeUnit.MILLISECONDS);
}

@Override
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, now() + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
}
}