Skip to content

Commit 00a362e

Browse files
committed
Merge branch 'master' into sleepingfix
Conflicts: rxjava-core/src/main/java/rx/concurrency/SleepingAction.java
2 parents b868b24 + b66557c commit 00a362e

19 files changed

+720
-187
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import rx.operators.OperationConcat;
4242
import rx.operators.OperationDefer;
4343
import rx.operators.OperationDematerialize;
44+
import rx.operators.OperationGroupBy;
4445
import rx.operators.OperationFilter;
4546
import rx.operators.OperationFinally;
4647
import rx.operators.OperationMap;
@@ -59,16 +60,15 @@
5960
import rx.operators.OperationSynchronize;
6061
import rx.operators.OperationTake;
6162
import rx.operators.OperationTakeLast;
63+
import rx.operators.OperationTakeUntil;
6264
import rx.operators.OperationTakeWhile;
65+
import rx.operators.OperationToIterator;
6366
import rx.operators.OperationToObservableFuture;
6467
import rx.operators.OperationToObservableIterable;
6568
import rx.operators.OperationToObservableList;
6669
import rx.operators.OperationToObservableSortedList;
6770
import rx.operators.OperationWhere;
6871
import rx.operators.OperationZip;
69-
import rx.operators.OperatorGroupBy;
70-
import rx.operators.OperatorTakeUntil;
71-
import rx.operators.OperatorToIterator;
7272
import rx.plugins.RxJavaErrorHandler;
7373
import rx.plugins.RxJavaObservableExecutionHook;
7474
import rx.plugins.RxJavaPlugins;
@@ -1303,7 +1303,7 @@ public static <T> Observable<T> merge(Observable<T>... source) {
13031303
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
13041304
*/
13051305
public static <T, E> Observable<T> takeUntil(final Observable<T> source, final Observable<E> other) {
1306-
return OperatorTakeUntil.takeUntil(source, other);
1306+
return OperationTakeUntil.takeUntil(source, other);
13071307
}
13081308

13091309
/**
@@ -1355,7 +1355,7 @@ public static <T> Observable<T> finallyDo(Observable<T> source, Action0 action)
13551355
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
13561356
*/
13571357
public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T> source, final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
1358-
return create(OperatorGroupBy.groupBy(source, keySelector, elementSelector));
1358+
return create(OperationGroupBy.groupBy(source, keySelector, elementSelector));
13591359
}
13601360

13611361
/**
@@ -1372,7 +1372,7 @@ public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T
13721372
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
13731373
*/
13741374
public static <K, T> Observable<GroupedObservable<K, T>> groupBy(Observable<T> source, final Func1<T, K> keySelector) {
1375-
return create(OperatorGroupBy.groupBy(source, keySelector));
1375+
return create(OperationGroupBy.groupBy(source, keySelector));
13761376
}
13771377

13781378
/**
@@ -2041,7 +2041,7 @@ public Iterator<T> iterator() {
20412041
* @return the iterator that could be used to iterate over the elements of the observable.
20422042
*/
20432043
public static <T> Iterator<T> getIterator(Observable<T> that) {
2044-
return OperatorToIterator.toIterator(that);
2044+
return OperationToIterator.toIterator(that);
20452045
}
20462046

20472047
/**

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 150 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,99 @@
1515
*/
1616
package rx;
1717

18+
import java.util.Date;
1819
import java.util.concurrent.TimeUnit;
1920

21+
import rx.subscriptions.Subscriptions;
2022
import rx.util.functions.Action0;
2123
import rx.util.functions.Func0;
24+
import rx.util.functions.Func1;
25+
import rx.util.functions.Func2;
2226

2327
/**
2428
* Represents an object that schedules units of work.
29+
* <p>
30+
* The methods left to implement are:
31+
* <ul>
32+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
33+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
34+
* </ul>
35+
* <p>
36+
* Why is this an abstract class instead of an interface?
37+
* <p>
38+
* <ol>
39+
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li>
40+
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li>
41+
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
42+
* functionality.</li>
43+
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li>
44+
* </ol>
2545
*/
26-
public interface Scheduler {
46+
public abstract class Scheduler {
47+
48+
/**
49+
* Schedules a cancelable action to be executed.
50+
*
51+
* @param state
52+
* State to pass into the action.
53+
* @param action
54+
* Action to schedule.
55+
* @return a subscription to be able to unsubscribe from action.
56+
*/
57+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
58+
59+
/**
60+
* Schedules a cancelable action to be executed in delayTime.
61+
*
62+
* @param state
63+
* State to pass into the action.
64+
* @param action
65+
* Action to schedule.
66+
* @param delayTime
67+
* Time the action is to be delayed before executing.
68+
* @param unit
69+
* Time unit of the delay time.
70+
* @return a subscription to be able to unsubscribe from action.
71+
*/
72+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
73+
74+
/**
75+
* Schedules a cancelable action to be executed at dueTime.
76+
*
77+
* @param state
78+
* State to pass into the action.
79+
* @param action
80+
* Action to schedule.
81+
* @param dueTime
82+
* Time the action is to be executed. If in the past it will be executed immediately.
83+
* @return a subscription to be able to unsubscribe from action.
84+
*/
85+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime) {
86+
long scheduledTime = dueTime.getTime();
87+
long timeInFuture = scheduledTime - now();
88+
if (timeInFuture <= 0) {
89+
return schedule(state, action);
90+
} else {
91+
return schedule(state, action, timeInFuture, TimeUnit.MILLISECONDS);
92+
}
93+
}
94+
95+
/**
96+
* Schedules a cancelable action to be executed.
97+
*
98+
* @param action
99+
* Action to schedule.
100+
* @return a subscription to be able to unsubscribe from action.
101+
*/
102+
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
103+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
104+
105+
@Override
106+
public Subscription call(Scheduler scheduler, Void t2) {
107+
return action.call(scheduler);
108+
}
109+
});
110+
}
27111

28112
/**
29113
* Schedules a cancelable action to be executed.
@@ -32,7 +116,15 @@ public interface Scheduler {
32116
* action
33117
* @return a subscription to be able to unsubscribe from action.
34118
*/
35-
Subscription schedule(Func0<Subscription> action);
119+
public Subscription schedule(final Func0<Subscription> action) {
120+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
121+
122+
@Override
123+
public Subscription call(Scheduler scheduler, Void t2) {
124+
return action.call();
125+
}
126+
});
127+
}
36128

37129
/**
38130
* Schedules an action to be executed.
@@ -41,29 +133,78 @@ public interface Scheduler {
41133
* action
42134
* @return a subscription to be able to unsubscribe from action.
43135
*/
44-
Subscription schedule(Action0 action);
136+
public Subscription schedule(final Action0 action) {
137+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
138+
139+
@Override
140+
public Subscription call(Scheduler scheduler, Void t2) {
141+
action.call();
142+
return Subscriptions.empty();
143+
}
144+
});
145+
}
45146

46147
/**
47-
* Schedules an action to be executed in dueTime.
148+
* Schedules a cancelable action to be executed in delayTime.
149+
*
150+
* @param action
151+
* Action to schedule.
152+
* @param delayTime
153+
* Time the action is to be delayed before executing.
154+
* @param unit
155+
* Time unit of the delay time.
156+
* @return a subscription to be able to unsubscribe from action.
157+
*/
158+
public Subscription schedule(final Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit) {
159+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
160+
161+
@Override
162+
public Subscription call(Scheduler scheduler, Void t2) {
163+
return action.call(scheduler);
164+
}
165+
}, delayTime, unit);
166+
}
167+
168+
/**
169+
* Schedules an action to be executed in delayTime.
48170
*
49171
* @param action
50172
* action
51173
* @return a subscription to be able to unsubscribe from action.
52174
*/
53-
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
175+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
176+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
177+
178+
@Override
179+
public Subscription call(Scheduler scheduler, Void t2) {
180+
action.call();
181+
return Subscriptions.empty();
182+
}
183+
}, delayTime, unit);
184+
}
54185

55186
/**
56-
* Schedules a cancelable action to be executed in dueTime.
187+
* Schedules a cancelable action to be executed in delayTime.
57188
*
58189
* @param action
59190
* action
60191
* @return a subscription to be able to unsubscribe from action.
61192
*/
62-
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
193+
public Subscription schedule(final Func0<Subscription> action, long delayTime, TimeUnit unit) {
194+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
195+
196+
@Override
197+
public Subscription call(Scheduler scheduler, Void t2) {
198+
return action.call();
199+
}
200+
}, delayTime, unit);
201+
}
63202

64203
/**
65-
* Returns the scheduler's notion of current time.
204+
* Returns the scheduler's notion of current absolute time in milliseconds.
66205
*/
67-
long now();
206+
public long now() {
207+
return System.currentTimeMillis();
208+
}
68209

69210
}

rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,51 +24,55 @@
2424
import org.junit.Test;
2525
import org.mockito.InOrder;
2626

27+
import rx.Scheduler;
2728
import rx.Subscription;
2829
import rx.util.functions.Action0;
29-
import rx.util.functions.Func0;
30+
import rx.util.functions.Func2;
3031

3132
/**
3233
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
3334
*/
34-
public class CurrentThreadScheduler extends AbstractScheduler {
35+
public class CurrentThreadScheduler extends Scheduler {
3536
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
3637

3738
public static CurrentThreadScheduler getInstance() {
3839
return INSTANCE;
3940
}
4041

41-
private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>();
42+
private static final ThreadLocal<Queue<DiscardableAction<?>>> QUEUE = new ThreadLocal<Queue<DiscardableAction<?>>>();
4243

4344
private CurrentThreadScheduler() {
4445
}
4546

4647
@Override
47-
public Subscription schedule(Func0<Subscription> action) {
48-
DiscardableAction discardableAction = new DiscardableAction(action);
48+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
49+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
4950
enqueue(discardableAction);
5051
return discardableAction;
5152
}
5253

5354
@Override
54-
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
55-
return schedule(new SleepingAction(action, this, dueTime, unit));
55+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
56+
// since we are executing immediately on this thread we must cause this thread to sleep
57+
// TODO right now the 'enqueue' does not take delay into account so if another task is enqueued after this it will
58+
// wait behind the sleeping action ... should that be the case or should it be allowed to proceed ahead of the delayed action?
59+
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
5660
}
5761

58-
private void enqueue(DiscardableAction action) {
59-
Queue<DiscardableAction> queue = QUEUE.get();
62+
private void enqueue(DiscardableAction<?> action) {
63+
Queue<DiscardableAction<?>> queue = QUEUE.get();
6064
boolean exec = queue == null;
6165

6266
if (exec) {
63-
queue = new LinkedList<DiscardableAction>();
67+
queue = new LinkedList<DiscardableAction<?>>();
6468
QUEUE.set(queue);
6569
}
6670

6771
queue.add(action);
6872

6973
if (exec) {
7074
while (!queue.isEmpty()) {
71-
queue.poll().call();
75+
queue.poll().call(this);
7276
}
7377

7478
QUEUE.set(null);
@@ -143,4 +147,5 @@ public void testSequenceOfActions() {
143147
}
144148

145149
}
150+
146151
}

0 commit comments

Comments
 (0)