Skip to content

Commit 5c70773

Browse files
Implement Scheduler method with dueTime
- added method: schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime)
1 parent 0b3110d commit 5c70773

File tree

2 files changed

+60
-1
lines changed

2 files changed

+60
-1
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx;
1717

18+
import java.util.Date;
1819
import java.util.concurrent.TimeUnit;
1920

2021
import rx.subscriptions.Subscriptions;
@@ -37,7 +38,8 @@
3738
* <ol>
3839
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li>
3940
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li>
40-
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the functionality.</li>
41+
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
42+
* functionality.</li>
4143
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li>
4244
* </ol>
4345
*/
@@ -69,6 +71,27 @@ public abstract class Scheduler {
6971
*/
7072
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
7173

74+
/**
75+
* Schedules a cancelable action to be executed at dueTime.
76+
*
77+
* @param state
78+
* State to pass into the action.
79+
* @param action
80+
* Action to schedule.
81+
* @param dueTime
82+
* Time the action is to be executed. If in the past it will be executed immediately.
83+
* @return a subscription to be able to unsubscribe from action.
84+
*/
85+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime) {
86+
long scheduledTime = dueTime.getTime();
87+
long timeInFuture = scheduledTime - now();
88+
if (timeInFuture <= 0) {
89+
return schedule(state, action);
90+
} else {
91+
return schedule(state, action, timeInFuture, TimeUnit.MILLISECONDS);
92+
}
93+
}
94+
7295
/**
7396
* Schedules a cancelable action to be executed.
7497
*

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.junit.Assert.*;
1919
import static org.mockito.Mockito.*;
2020

21+
import java.util.Date;
2122
import java.util.concurrent.CountDownLatch;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.atomic.AtomicBoolean;
@@ -359,4 +360,39 @@ public void onNext(Integer args) {
359360
assertTrue(completed.get());
360361
}
361362

363+
@Test
364+
public void testSchedulingWithDueTime() throws InterruptedException {
365+
366+
final CountDownLatch latch = new CountDownLatch(5);
367+
final AtomicInteger counter = new AtomicInteger();
368+
369+
long start = System.currentTimeMillis();
370+
371+
Schedulers.threadPoolForComputation().schedule(null, new Func2<Scheduler, String, Subscription>() {
372+
373+
@Override
374+
public Subscription call(Scheduler scheduler, String state) {
375+
System.out.println("doing work");
376+
latch.countDown();
377+
counter.incrementAndGet();
378+
if (latch.getCount() == 0) {
379+
return Subscriptions.empty();
380+
} else {
381+
return scheduler.schedule(state, this, new Date(System.currentTimeMillis() + 50));
382+
}
383+
}
384+
}, new Date(System.currentTimeMillis() + 100));
385+
386+
if (!latch.await(3000, TimeUnit.MILLISECONDS)) {
387+
fail("didn't execute ... timed out");
388+
}
389+
390+
long end = System.currentTimeMillis();
391+
392+
assertEquals(5, counter.get());
393+
if ((end - start) < 250) {
394+
fail("it should have taken over 250ms since each step was scheduled 50ms in the future");
395+
}
396+
}
397+
362398
}

0 commit comments

Comments
 (0)