Skip to content

Commit 3fb72d6

Browse files
author
jmhofer
committed
ExecutorScheduler now uses scheduleAtFixedRate instead of recursion when possible.
1 parent f2ef8f0 commit 3fb72d6

File tree

2 files changed

+23
-0
lines changed

2 files changed

+23
-0
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Date;
22+
import java.util.concurrent.Executors;
2223
import java.util.concurrent.TimeUnit;
2324
import java.util.concurrent.atomic.AtomicBoolean;
2425

2526
import org.junit.Test;
2627
import org.mockito.InOrder;
2728
import org.mockito.Mockito;
2829

30+
import rx.concurrency.Schedulers;
2931
import rx.concurrency.TestScheduler;
3032
import rx.subscriptions.Subscriptions;
3133
import rx.util.functions.Action0;

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,27 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
4444
this.executor = executor;
4545
}
4646

47+
@Override
48+
public <T> Subscription schedulePeriodically(final T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
49+
if (executor instanceof ScheduledExecutorService) {
50+
final CompositeSubscription subscriptions = new CompositeSubscription();
51+
52+
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
53+
@Override
54+
public void run() {
55+
Subscription s = action.call(ExecutorScheduler.this, state);
56+
subscriptions.add(s);
57+
}
58+
}, initialDelay, period, unit);
59+
60+
subscriptions.add(Subscriptions.create(f));
61+
return subscriptions;
62+
63+
} else {
64+
return super.schedulePeriodically(state, action, initialDelay, period, unit);
65+
}
66+
}
67+
4768
@Override
4869
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
4970
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);

0 commit comments

Comments
 (0)