Skip to content

Commit 8d3e766

Browse files
committed
Priority of the delayed actions in CurrentThreadScheduler
1 parent b66557c commit 8d3e766

File tree

3 files changed

+70
-25
lines changed

3 files changed

+70
-25
lines changed

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 54 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,8 +17,7 @@
1717

1818
import static org.mockito.Mockito.*;
1919

20-
import java.util.LinkedList;
21-
import java.util.Queue;
20+
import java.util.PriorityQueue;
2221
import java.util.concurrent.TimeUnit;
2322

2423
import org.junit.Test;
@@ -39,46 +38,62 @@ public static CurrentThreadScheduler getInstance() {
3938
return INSTANCE;
4039
}
4140

42-
private static final ThreadLocal<Queue<DiscardableAction<?>>> QUEUE = new ThreadLocal<Queue<DiscardableAction<?>>>();
41+
private static final ThreadLocal<PriorityQueue<TimedAction>> QUEUE = new ThreadLocal<PriorityQueue<TimedAction>>();
4342

4443
private CurrentThreadScheduler() {
4544
}
4645

4746
@Override
4847
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
4948
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
50-
enqueue(discardableAction);
49+
enqueue(discardableAction, now());
5150
return discardableAction;
5251
}
5352

5453
@Override
5554
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
56-
// since we are executing immediately on this thread we must cause this thread to sleep
57-
// TODO right now the 'enqueue' does not take delay into account so if another task is enqueued after this it will
58-
// wait behind the sleeping action ... should that be the case or should it be allowed to proceed ahead of the delayed action?
59-
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
55+
long execTime = now() + unit.toMillis(dueTime);
56+
57+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));
58+
enqueue(discardableAction, execTime);
59+
return discardableAction;
6060
}
6161

62-
private void enqueue(DiscardableAction<?> action) {
63-
Queue<DiscardableAction<?>> queue = QUEUE.get();
62+
private void enqueue(DiscardableAction<?> action, long execTime) {
63+
PriorityQueue<TimedAction> queue = QUEUE.get();
6464
boolean exec = queue == null;
6565

6666
if (exec) {
67-
queue = new LinkedList<DiscardableAction<?>>();
67+
queue = new PriorityQueue<TimedAction>();
6868
QUEUE.set(queue);
6969
}
7070

71-
queue.add(action);
71+
queue.add(new TimedAction(action, execTime));
7272

7373
if (exec) {
7474
while (!queue.isEmpty()) {
75-
queue.poll().call(this);
75+
queue.poll().action.call(this);
7676
}
7777

7878
QUEUE.set(null);
7979
}
8080
}
8181

82+
private static class TimedAction implements Comparable<TimedAction> {
83+
final DiscardableAction<?> action;
84+
final Long execTime;
85+
86+
private TimedAction(DiscardableAction<?> action, Long execTime) {
87+
this.action = action;
88+
this.execTime = execTime;
89+
}
90+
91+
@Override
92+
public int compareTo(TimedAction timedAction) {
93+
return execTime.compareTo(timedAction.execTime);
94+
}
95+
}
96+
8297
public static class UnitTest {
8398

8499
@Test
@@ -146,6 +161,29 @@ public void testSequenceOfActions() {
146161

147162
}
148163

164+
@Test
165+
public void testSequenceOfDelayedActions() {
166+
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
167+
168+
final Action0 first = mock(Action0.class);
169+
final Action0 second = mock(Action0.class);
170+
171+
scheduler.schedule(new Action0() {
172+
@Override
173+
public void call() {
174+
scheduler.schedule(first, 30, TimeUnit.MILLISECONDS);
175+
scheduler.schedule(second, 10, TimeUnit.MILLISECONDS);
176+
}
177+
});
178+
179+
InOrder inOrder = inOrder(first, second);
180+
181+
inOrder.verify(second, times(1)).call();
182+
inOrder.verify(first, times(1)).call();
183+
184+
185+
}
186+
149187
}
150188

151189
}

rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti
4848
@Override
4949
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
5050
// since we are executing immediately on this thread we must cause this thread to sleep
51-
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
51+
long execTime = now() + unit.toMillis(dueTime);
52+
53+
return schedule(state, new SleepingAction<T>(action, this, execTime));
5254
}
5355

5456
public static class UnitTest {

rxjava-core/src/main/java/rx/concurrency/SleepingAction.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,25 @@
2626
private final Scheduler scheduler;
2727
private final long execTime;
2828

29-
public SleepingAction(Func2<Scheduler, T, Subscription> underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) {
29+
public SleepingAction(Func2<Scheduler, T, Subscription> underlying, Scheduler scheduler, long execTime) {
3030
this.underlying = underlying;
3131
this.scheduler = scheduler;
32-
this.execTime = scheduler.now() + timeUnit.toMillis(timespan);
32+
this.execTime = execTime;
3333
}
3434

35+
3536
@Override
3637
public Subscription call(Scheduler s, T state) {
37-
if (execTime < scheduler.now()) {
38-
try {
39-
Thread.sleep(scheduler.now() - execTime);
40-
} catch (InterruptedException e) {
41-
Thread.currentThread().interrupt();
42-
throw new RuntimeException(e);
38+
if (execTime > scheduler.now()) {
39+
long delay = execTime - scheduler.now();
40+
if (delay> 0) {
41+
try {
42+
Thread.sleep(delay);
43+
}
44+
catch (InterruptedException e) {
45+
Thread.currentThread().interrupt();
46+
throw new RuntimeException(e);
47+
}
4348
}
4449
}
4550

0 commit comments

Comments
 (0)