Skip to content

Commit 0b3110d

Browse files
Merge Scheduler/AbstractScheduler
- using abstract class for Scheduler for same reason Observable is concrete - discussed and decided upon at ReactiveX#235
1 parent 4d7eb2e commit 0b3110d

File tree

7 files changed

+84
-130
lines changed

7 files changed

+84
-130
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 79 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,31 @@
1717

1818
import java.util.concurrent.TimeUnit;
1919

20+
import rx.subscriptions.Subscriptions;
2021
import rx.util.functions.Action0;
2122
import rx.util.functions.Func0;
2223
import rx.util.functions.Func1;
2324
import rx.util.functions.Func2;
2425

2526
/**
2627
* Represents an object that schedules units of work.
28+
* <p>
29+
* The methods left to implement are:
30+
* <ul>
31+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
32+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
33+
* </ul>
34+
* <p>
35+
* Why is this an abstract class instead of an interface?
36+
* <p>
37+
* <ol>
38+
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li>
39+
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li>
40+
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the functionality.</li>
41+
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li>
42+
* </ol>
2743
*/
28-
public interface Scheduler {
44+
public abstract class Scheduler {
2945

3046
/**
3147
* Schedules a cancelable action to be executed.
@@ -36,7 +52,7 @@ public interface Scheduler {
3652
* Action to schedule.
3753
* @return a subscription to be able to unsubscribe from action.
3854
*/
39-
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
55+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
4056

4157
/**
4258
* Schedules a cancelable action to be executed in delayTime.
@@ -51,7 +67,7 @@ public interface Scheduler {
5167
* Time unit of the delay time.
5268
* @return a subscription to be able to unsubscribe from action.
5369
*/
54-
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
70+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
5571

5672
/**
5773
* Schedules a cancelable action to be executed.
@@ -60,7 +76,15 @@ public interface Scheduler {
6076
* Action to schedule.
6177
* @return a subscription to be able to unsubscribe from action.
6278
*/
63-
Subscription schedule(Func1<Scheduler, Subscription> action);
79+
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
80+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
81+
82+
@Override
83+
public Subscription call(Scheduler scheduler, Void t2) {
84+
return action.call(scheduler);
85+
}
86+
});
87+
}
6488

6589
/**
6690
* Schedules a cancelable action to be executed.
@@ -69,7 +93,15 @@ public interface Scheduler {
6993
* action
7094
* @return a subscription to be able to unsubscribe from action.
7195
*/
72-
Subscription schedule(Func0<Subscription> action);
96+
public Subscription schedule(final Func0<Subscription> action) {
97+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
98+
99+
@Override
100+
public Subscription call(Scheduler scheduler, Void t2) {
101+
return action.call();
102+
}
103+
});
104+
}
73105

74106
/**
75107
* Schedules an action to be executed.
@@ -78,7 +110,16 @@ public interface Scheduler {
78110
* action
79111
* @return a subscription to be able to unsubscribe from action.
80112
*/
81-
Subscription schedule(Action0 action);
113+
public Subscription schedule(final Action0 action) {
114+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
115+
116+
@Override
117+
public Subscription call(Scheduler scheduler, Void t2) {
118+
action.call();
119+
return Subscriptions.empty();
120+
}
121+
});
122+
}
82123

83124
/**
84125
* Schedules a cancelable action to be executed in delayTime.
@@ -91,7 +132,15 @@ public interface Scheduler {
91132
* Time unit of the delay time.
92133
* @return a subscription to be able to unsubscribe from action.
93134
*/
94-
Subscription schedule(Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit);
135+
public Subscription schedule(final Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit) {
136+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
137+
138+
@Override
139+
public Subscription call(Scheduler scheduler, Void t2) {
140+
return action.call(scheduler);
141+
}
142+
}, delayTime, unit);
143+
}
95144

96145
/**
97146
* Schedules an action to be executed in delayTime.
@@ -100,7 +149,16 @@ public interface Scheduler {
100149
* action
101150
* @return a subscription to be able to unsubscribe from action.
102151
*/
103-
Subscription schedule(Action0 action, long delayTime, TimeUnit unit);
152+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
153+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
154+
155+
@Override
156+
public Subscription call(Scheduler scheduler, Void t2) {
157+
action.call();
158+
return Subscriptions.empty();
159+
}
160+
}, delayTime, unit);
161+
}
104162

105163
/**
106164
* Schedules a cancelable action to be executed in delayTime.
@@ -109,11 +167,21 @@ public interface Scheduler {
109167
* action
110168
* @return a subscription to be able to unsubscribe from action.
111169
*/
112-
Subscription schedule(Func0<Subscription> action, long delayTime, TimeUnit unit);
170+
public Subscription schedule(final Func0<Subscription> action, long delayTime, TimeUnit unit) {
171+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
172+
173+
@Override
174+
public Subscription call(Scheduler scheduler, Void t2) {
175+
return action.call();
176+
}
177+
}, delayTime, unit);
178+
}
113179

114180
/**
115-
* Returns the scheduler's notion of current time.
181+
* Returns the scheduler's notion of current absolute time in milliseconds.
116182
*/
117-
long now();
183+
public long now() {
184+
return System.currentTimeMillis();
185+
}
118186

119187
}

rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

Lines changed: 0 additions & 114 deletions
This file was deleted.

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/**
3333
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
3434
*/
35-
public class CurrentThreadScheduler extends AbstractScheduler {
35+
public class CurrentThreadScheduler extends Scheduler {
3636
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
3737

3838
public static CurrentThreadScheduler getInstance() {

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
* <p>
3434
* Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events.
3535
*/
36-
public class ExecutorScheduler extends AbstractScheduler {
36+
public class ExecutorScheduler extends Scheduler {
3737
private final Executor executor;
3838

3939
public ExecutorScheduler(Executor executor) {

rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
/**
3131
* Executes work immediately on the current thread.
3232
*/
33-
public final class ImmediateScheduler extends AbstractScheduler {
33+
public final class ImmediateScheduler extends Scheduler {
3434
private static final ImmediateScheduler INSTANCE = new ImmediateScheduler();
3535

3636
public static ImmediateScheduler getInstance() {

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
/**
2929
* Schedules work on a new thread.
3030
*/
31-
public class NewThreadScheduler extends AbstractScheduler {
31+
public class NewThreadScheduler extends Scheduler {
3232
private static final NewThreadScheduler INSTANCE = new NewThreadScheduler();
3333

3434
public static NewThreadScheduler getInstance() {

rxjava-core/src/main/java/rx/operators/Tester.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ public void onNext(String args)
273273
}
274274
}
275275

276-
public static class ForwardingScheduler implements Scheduler {
276+
public static class ForwardingScheduler extends Scheduler {
277277
private final Scheduler underlying;
278278

279279
public ForwardingScheduler(Scheduler underlying) {

0 commit comments

Comments
 (0)