Skip to content

Commit f2966a6

Browse files
author
jmhofer
committed
Merge branch 'master' into sample
2 parents 3ac0d0b + 874554f commit f2966a6

34 files changed

+2800
-555
lines changed

CHANGES.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,18 @@
11
# RxJava Releases #
22

3+
### Version 0.8.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.8.0%22)) ###
4+
5+
This is a breaking (non-backwards compatible) release that updates the Scheduler implementation released in 0.7.0.
6+
7+
See See https://github.com/Netflix/RxJava/issues/19 for background, discussion and status of Schedulers.
8+
9+
It is believed that the public signatures of Scheduler and related objects is now stabilized but ongoing feedback and review by the community could still result in changes.
10+
11+
* [Issue 19](https://github.com/Netflix/RxJava/issues/19) Schedulers improvements, changes and additions
12+
* [Issue 202](https://github.com/Netflix/RxJava/issues/202) Fix Concat bugs
13+
* [Issue 65](https://github.com/Netflix/RxJava/issues/65) Multicast
14+
* [Pull 218](https://github.com/Netflix/RxJava/pull/218) ReplaySubject
15+
316
### Version 0.7.0 ([Maven Central](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.netflix.rxjava%22%20AND%20v%3A%220.7.0%22)) ###
417

518
This release adds the foundations of Rx Schedulers.

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version=0.7.1-SNAPSHOT
1+
version=0.8.1-SNAPSHOT

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,18 +36,21 @@
3636
import org.mockito.Mockito;
3737
import org.mockito.MockitoAnnotations;
3838

39+
import rx.observables.ConnectableObservable;
3940
import rx.observables.GroupedObservable;
4041
import rx.operators.OperationAll;
4142
import rx.operators.OperationConcat;
4243
import rx.operators.OperationDefer;
4344
import rx.operators.OperationDematerialize;
45+
import rx.operators.OperationGroupBy;
4446
import rx.operators.OperationFilter;
4547
import rx.operators.OperationFinally;
4648
import rx.operators.OperationMap;
4749
import rx.operators.OperationMaterialize;
4850
import rx.operators.OperationMerge;
4951
import rx.operators.OperationMergeDelayError;
5052
import rx.operators.OperationMostRecent;
53+
import rx.operators.OperationMulticast;
5154
import rx.operators.OperationNext;
5255
import rx.operators.OperationObserveOn;
5356
import rx.operators.OperationOnErrorResumeNextViaFunction;
@@ -59,19 +62,19 @@
5962
import rx.operators.OperationSynchronize;
6063
import rx.operators.OperationTake;
6164
import rx.operators.OperationTakeLast;
65+
import rx.operators.OperationTakeUntil;
6266
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationToIterator;
6368
import rx.operators.OperationToObservableFuture;
6469
import rx.operators.OperationToObservableIterable;
6570
import rx.operators.OperationToObservableList;
6671
import rx.operators.OperationToObservableSortedList;
6772
import rx.operators.OperationWhere;
6873
import rx.operators.OperationZip;
69-
import rx.operators.OperatorGroupBy;
70-
import rx.operators.OperatorTakeUntil;
71-
import rx.operators.OperatorToIterator;
7274
import rx.plugins.RxJavaErrorHandler;
7375
import rx.plugins.RxJavaObservableExecutionHook;
7476
import rx.plugins.RxJavaPlugins;
77+
import rx.subjects.Subject;
7578
import rx.subscriptions.BooleanSubscription;
7679
import rx.subscriptions.Subscriptions;
7780
import rx.util.AtomicObservableSubscription;
@@ -585,6 +588,17 @@ public void call(Object args) {
585588
});
586589
}
587590

591+
/**
592+
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
593+
*
594+
* @param subject the subject to push source elements into.
595+
* @param <R> result type
596+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
597+
*/
598+
public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
599+
return multicast(this, subject);
600+
}
601+
588602
/**
589603
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
590604
*
@@ -1303,7 +1317,7 @@ public static <T> Observable<T> merge(Observable<T>... source) {
13031317
* @return An observable sequence containing the elements of the source sequence up to the point the other sequence interrupted further propagation.
13041318
*/
13051319
public static <T, E> Observable<T> takeUntil(final Observable<T> source, final Observable<E> other) {
1306-
return OperatorTakeUntil.takeUntil(source, other);
1320+
return OperationTakeUntil.takeUntil(source, other);
13071321
}
13081322

13091323
/**
@@ -1355,7 +1369,7 @@ public static <T> Observable<T> finallyDo(Observable<T> source, Action0 action)
13551369
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
13561370
*/
13571371
public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T> source, final Func1<T, K> keySelector, final Func1<T, R> elementSelector) {
1358-
return create(OperatorGroupBy.groupBy(source, keySelector, elementSelector));
1372+
return create(OperationGroupBy.groupBy(source, keySelector, elementSelector));
13591373
}
13601374

13611375
/**
@@ -1372,7 +1386,7 @@ public static <K, T, R> Observable<GroupedObservable<K, R>> groupBy(Observable<T
13721386
* @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
13731387
*/
13741388
public static <K, T> Observable<GroupedObservable<K, T>> groupBy(Observable<T> source, final Func1<T, K> keySelector) {
1375-
return create(OperatorGroupBy.groupBy(source, keySelector));
1389+
return create(OperationGroupBy.groupBy(source, keySelector));
13761390
}
13771391

13781392
/**
@@ -2041,7 +2055,7 @@ public Iterator<T> iterator() {
20412055
* @return the iterator that could be used to iterate over the elements of the observable.
20422056
*/
20432057
public static <T> Iterator<T> getIterator(Observable<T> that) {
2044-
return OperatorToIterator.toIterator(that);
2058+
return OperationToIterator.toIterator(that);
20452059
}
20462060

20472061
/**
@@ -2072,9 +2086,22 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
20722086
return OperationMostRecent.mostRecent(source, initialValue);
20732087
}
20742088

2089+
/**
2090+
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2091+
*
2092+
* @param source the source sequence whose elements will be pushed into the specified subject.
2093+
* @param subject the subject to push source elements into.
2094+
* @param <T> source type
2095+
* @param <R> result type
2096+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2097+
*/
2098+
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
2099+
return OperationMulticast.multicast(source, subject);
2100+
}
2101+
20752102
/**
20762103
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
2077-
*
2104+
*
20782105
* @param that
20792106
* the source Observable
20802107
* @return The single element in the observable sequence.

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 150 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,99 @@
1515
*/
1616
package rx;
1717

18+
import java.util.Date;
1819
import java.util.concurrent.TimeUnit;
1920

21+
import rx.subscriptions.Subscriptions;
2022
import rx.util.functions.Action0;
2123
import rx.util.functions.Func0;
24+
import rx.util.functions.Func1;
25+
import rx.util.functions.Func2;
2226

2327
/**
2428
* Represents an object that schedules units of work.
29+
* <p>
30+
* The methods left to implement are:
31+
* <ul>
32+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
33+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
34+
* </ul>
35+
* <p>
36+
* Why is this an abstract class instead of an interface?
37+
* <p>
38+
* <ol>
39+
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li>
40+
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li>
41+
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
42+
* functionality.</li>
43+
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li>
44+
* </ol>
2545
*/
26-
public interface Scheduler {
46+
public abstract class Scheduler {
47+
48+
/**
49+
* Schedules a cancelable action to be executed.
50+
*
51+
* @param state
52+
* State to pass into the action.
53+
* @param action
54+
* Action to schedule.
55+
* @return a subscription to be able to unsubscribe from action.
56+
*/
57+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
58+
59+
/**
60+
* Schedules a cancelable action to be executed in delayTime.
61+
*
62+
* @param state
63+
* State to pass into the action.
64+
* @param action
65+
* Action to schedule.
66+
* @param delayTime
67+
* Time the action is to be delayed before executing.
68+
* @param unit
69+
* Time unit of the delay time.
70+
* @return a subscription to be able to unsubscribe from action.
71+
*/
72+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
73+
74+
/**
75+
* Schedules a cancelable action to be executed at dueTime.
76+
*
77+
* @param state
78+
* State to pass into the action.
79+
* @param action
80+
* Action to schedule.
81+
* @param dueTime
82+
* Time the action is to be executed. If in the past it will be executed immediately.
83+
* @return a subscription to be able to unsubscribe from action.
84+
*/
85+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime) {
86+
long scheduledTime = dueTime.getTime();
87+
long timeInFuture = scheduledTime - now();
88+
if (timeInFuture <= 0) {
89+
return schedule(state, action);
90+
} else {
91+
return schedule(state, action, timeInFuture, TimeUnit.MILLISECONDS);
92+
}
93+
}
94+
95+
/**
96+
* Schedules a cancelable action to be executed.
97+
*
98+
* @param action
99+
* Action to schedule.
100+
* @return a subscription to be able to unsubscribe from action.
101+
*/
102+
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
103+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
104+
105+
@Override
106+
public Subscription call(Scheduler scheduler, Void t2) {
107+
return action.call(scheduler);
108+
}
109+
});
110+
}
27111

28112
/**
29113
* Schedules a cancelable action to be executed.
@@ -32,7 +116,15 @@ public interface Scheduler {
32116
* action
33117
* @return a subscription to be able to unsubscribe from action.
34118
*/
35-
Subscription schedule(Func0<Subscription> action);
119+
public Subscription schedule(final Func0<Subscription> action) {
120+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
121+
122+
@Override
123+
public Subscription call(Scheduler scheduler, Void t2) {
124+
return action.call();
125+
}
126+
});
127+
}
36128

37129
/**
38130
* Schedules an action to be executed.
@@ -41,29 +133,78 @@ public interface Scheduler {
41133
* action
42134
* @return a subscription to be able to unsubscribe from action.
43135
*/
44-
Subscription schedule(Action0 action);
136+
public Subscription schedule(final Action0 action) {
137+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
138+
139+
@Override
140+
public Subscription call(Scheduler scheduler, Void t2) {
141+
action.call();
142+
return Subscriptions.empty();
143+
}
144+
});
145+
}
45146

46147
/**
47-
* Schedules an action to be executed in dueTime.
148+
* Schedules a cancelable action to be executed in delayTime.
149+
*
150+
* @param action
151+
* Action to schedule.
152+
* @param delayTime
153+
* Time the action is to be delayed before executing.
154+
* @param unit
155+
* Time unit of the delay time.
156+
* @return a subscription to be able to unsubscribe from action.
157+
*/
158+
public Subscription schedule(final Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit) {
159+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
160+
161+
@Override
162+
public Subscription call(Scheduler scheduler, Void t2) {
163+
return action.call(scheduler);
164+
}
165+
}, delayTime, unit);
166+
}
167+
168+
/**
169+
* Schedules an action to be executed in delayTime.
48170
*
49171
* @param action
50172
* action
51173
* @return a subscription to be able to unsubscribe from action.
52174
*/
53-
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
175+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
176+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
177+
178+
@Override
179+
public Subscription call(Scheduler scheduler, Void t2) {
180+
action.call();
181+
return Subscriptions.empty();
182+
}
183+
}, delayTime, unit);
184+
}
54185

55186
/**
56-
* Schedules a cancelable action to be executed in dueTime.
187+
* Schedules a cancelable action to be executed in delayTime.
57188
*
58189
* @param action
59190
* action
60191
* @return a subscription to be able to unsubscribe from action.
61192
*/
62-
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
193+
public Subscription schedule(final Func0<Subscription> action, long delayTime, TimeUnit unit) {
194+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
195+
196+
@Override
197+
public Subscription call(Scheduler scheduler, Void t2) {
198+
return action.call();
199+
}
200+
}, delayTime, unit);
201+
}
63202

64203
/**
65-
* Returns the scheduler's notion of current time.
204+
* Returns the scheduler's notion of current absolute time in milliseconds.
66205
*/
67-
long now();
206+
public long now() {
207+
return System.currentTimeMillis();
208+
}
68209

69210
}

0 commit comments

Comments
 (0)