Skip to content

Commit 067afbe

Browse files
committed
Break up the Worker as Completable into Worker as Observable. Now the schedules actions are the indivisible elements that are subscribed to. The user has additional choice at the cost of making the API more complicated.
1 parent 4752fa3 commit 067afbe

File tree

2 files changed

+34
-16
lines changed

2 files changed

+34
-16
lines changed

src/main/java/rx/schedulers/ScheduleWhen.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@
2020
import java.util.concurrent.atomic.AtomicReference;
2121

2222
import rx.Completable;
23+
import rx.Completable.CompletableOnSubscribe;
24+
import rx.Completable.CompletableSubscriber;
2325
import rx.Observable;
2426
import rx.Scheduler;
2527
import rx.Subscription;
2628
import rx.annotations.Experimental;
2729
import rx.functions.Action0;
28-
import rx.functions.Action1;
2930
import rx.functions.Func1;
3031
import rx.internal.operators.BufferUntilSubscriber;
3132
import rx.subjects.PublishSubject;
@@ -74,9 +75,9 @@
7475
@Experimental
7576
public class ScheduleWhen extends Scheduler {
7677
private final Scheduler actualScheduler;
77-
private final PublishSubject<Completable> workerQueue;
78+
private final PublishSubject<Observable<Completable>> workerQueue;
7879

79-
public ScheduleWhen(Func1<Observable<Completable>, Completable> combine, Scheduler actualScheduler) {
80+
public ScheduleWhen(Func1<Observable<Observable<Completable>>, Completable> combine, Scheduler actualScheduler) {
8081
this.actualScheduler = actualScheduler;
8182
// workers are converted into completables and put in this queue.
8283
this.workerQueue = PublishSubject.create();
@@ -87,18 +88,24 @@ public ScheduleWhen(Func1<Observable<Completable>, Completable> combine, Schedul
8788

8889
@Override
8990
public Worker createWorker() {
91+
final Worker actualWorker = actualScheduler.createWorker();
9092
// a queue for the actions submitted while worker is waiting to get to
9193
// the subscribe to off the workerQueue.
92-
final BufferUntilSubscriber<ScheduledAction> actionQueue = BufferUntilSubscriber.create();
93-
final Worker actualWorker = actualScheduler.createWorker();
94-
94+
final BufferUntilSubscriber<ScheduledAction> actionQueue = BufferUntilSubscriber.<ScheduledAction>create();
9595
// convert the work of scheduling all the actions into a completable
96-
Completable completable = actionQueue.doOnNext(new Action1<ScheduledAction>() {
96+
Observable<Completable> actions = actionQueue.map(new Func1<ScheduledAction, Completable>() {
9797
@Override
98-
public void call(ScheduledAction action) {
99-
action.call(actualWorker);
98+
public Completable call(final ScheduledAction action) {
99+
return Completable.create(new CompletableOnSubscribe() {
100+
@Override
101+
public void call(CompletableSubscriber actionCompletable) {
102+
actionCompletable.onSubscribe(action);
103+
action.call(actualWorker);
104+
actionCompletable.onCompleted();
105+
}
106+
});
100107
}
101-
}).toCompletable();
108+
});
102109

103110
// a worker that queues the action to the actionQueue subject.
104111
Worker worker = new Worker() {
@@ -137,7 +144,7 @@ public Subscription schedule(final Action0 action) {
137144
};
138145

139146
// enqueue the completable that process actions put in reply subject
140-
workerQueue.onNext(completable);
147+
workerQueue.onNext(actions);
141148

142149
// return the worker that adds actions to the reply subject
143150
return worker;

src/test/java/rx/schedulers/ScheduleWhenTest.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,15 @@ public Observable<Long> call(Integer t) {
4242
public void testMaxConcurrent() {
4343
TestScheduler tSched = new TestScheduler();
4444

45-
Scheduler sched = new ScheduleWhen(new Func1<Observable<Completable>, Completable>() {
45+
Scheduler sched = new ScheduleWhen(new Func1<Observable<Observable<Completable>>, Completable>() {
4646
@Override
47-
public Completable call(Observable<Completable> workers) {
47+
public Completable call(Observable<Observable<Completable>> workerActions) {
48+
Observable<Completable> workers = workerActions.map(new Func1<Observable<Completable>, Completable>() {
49+
@Override
50+
public Completable call(Observable<Completable> actions) {
51+
return Completable.concat(actions);
52+
}
53+
});
4854
return Completable.merge(workers, 2);
4955
}
5056
}, tSched);
@@ -70,11 +76,16 @@ public Completable call(Observable<Completable> workers) {
7076
public void testDelaySubscription() {
7177
final TestScheduler tSched = new TestScheduler();
7278

73-
Scheduler sched = new ScheduleWhen(new Func1<Observable<Completable>, Completable>() {
79+
Scheduler sched = new ScheduleWhen(new Func1<Observable<Observable<Completable>>, Completable>() {
7480
@Override
75-
public Completable call(Observable<Completable> workers) {
76-
return Completable.concat(workers.map(new Func1<Completable, Completable>() {
81+
public Completable call(Observable<Observable<Completable>> workerActions) {
82+
Observable<Completable> workers = workerActions.map(new Func1<Observable<Completable>, Completable>() {
7783
@Override
84+
public Completable call(Observable<Completable> actions) {
85+
return Completable.concat(actions);
86+
}
87+
});
88+
return Completable.concat(workers.map(new Func1<Completable, Completable>() {
7889
public Completable call(Completable worker) {
7990
return worker.delay(1, TimeUnit.SECONDS, tSched);
8091
}

0 commit comments

Comments
 (0)