Skip to content

Commit 80f121c

Browse files
committed
CurrentThreadScheduler updates.
Unit test that mixes delayed and immediate scheduling. Added counter to resolve conflicts in case in time between enqueueing took less than 1 ms.
1 parent fe716b3 commit 80f121c

File tree

1 file changed

+42
-4
lines changed

1 file changed

+42
-4
lines changed

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.PriorityQueue;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicInteger;
2223

2324
import org.junit.Test;
2425
import org.mockito.InOrder;
@@ -43,6 +44,8 @@ public static CurrentThreadScheduler getInstance() {
4344
private CurrentThreadScheduler() {
4445
}
4546

47+
private final AtomicInteger counter = new AtomicInteger(0);
48+
4649
@Override
4750
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
4851
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
@@ -68,7 +71,7 @@ private void enqueue(DiscardableAction<?> action, long execTime) {
6871
QUEUE.set(queue);
6972
}
7073

71-
queue.add(new TimedAction(action, execTime));
74+
queue.add(new TimedAction(action, execTime, counter.incrementAndGet()));
7275

7376
if (exec) {
7477
while (!queue.isEmpty()) {
@@ -82,15 +85,21 @@ private void enqueue(DiscardableAction<?> action, long execTime) {
8285
private static class TimedAction implements Comparable<TimedAction> {
8386
final DiscardableAction<?> action;
8487
final Long execTime;
88+
final Integer count; // In case if time between enqueueing took less than 1ms
8589

86-
private TimedAction(DiscardableAction<?> action, Long execTime) {
90+
private TimedAction(DiscardableAction<?> action, Long execTime, Integer count) {
8791
this.action = action;
8892
this.execTime = execTime;
93+
this.count = count;
8994
}
9095

9196
@Override
92-
public int compareTo(TimedAction timedAction) {
93-
return execTime.compareTo(timedAction.execTime);
97+
public int compareTo(TimedAction that) {
98+
int result = execTime.compareTo(that.execTime);
99+
if (result == 0) {
100+
return count.compareTo(that.count);
101+
}
102+
return result;
94103
}
95104
}
96105

@@ -184,6 +193,35 @@ public void call() {
184193

185194
}
186195

196+
@Test
197+
public void testMixOfDelayedAndNonDelayedActions() {
198+
final CurrentThreadScheduler scheduler = new CurrentThreadScheduler();
199+
200+
final Action0 first = mock(Action0.class);
201+
final Action0 second = mock(Action0.class);
202+
final Action0 third = mock(Action0.class);
203+
final Action0 fourth = mock(Action0.class);
204+
205+
scheduler.schedule(new Action0() {
206+
@Override
207+
public void call() {
208+
scheduler.schedule(first);
209+
scheduler.schedule(second, 300, TimeUnit.MILLISECONDS);
210+
scheduler.schedule(third, 100, TimeUnit.MILLISECONDS);
211+
scheduler.schedule(fourth);
212+
}
213+
});
214+
215+
InOrder inOrder = inOrder(first, second, third, fourth);
216+
217+
inOrder.verify(first, times(1)).call();
218+
inOrder.verify(fourth, times(1)).call();
219+
inOrder.verify(third, times(1)).call();
220+
inOrder.verify(second, times(1)).call();
221+
222+
223+
}
224+
187225
}
188226

189227
}

0 commit comments

Comments
 (0)