Skip to content

Commit 8e2082c

Browse files
committed
Ability to create custom schedulers with behavior based on composing operators.
• rewrote the test to get a little more coverage. • wrapping each of the onNext/onCompleted to ensure no overlapping calls • Break up the Worker as Completable into Worker as Observable. Now the schedules actions are the indivisible elements that are subscribed to. The user has additional choice at the cost of making the API more complicated.
1 parent cec8915 commit 8e2082c

File tree

2 files changed

+460
-0
lines changed

2 files changed

+460
-0
lines changed
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,263 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Completable;
23+
import rx.Completable.CompletableOnSubscribe;
24+
import rx.Completable.CompletableSubscriber;
25+
import rx.Observable;
26+
import rx.Scheduler;
27+
import rx.Subscription;
28+
import rx.annotations.Experimental;
29+
import rx.functions.Action0;
30+
import rx.functions.Func1;
31+
import rx.internal.operators.BufferUntilSubscriber;
32+
import rx.subjects.PublishSubject;
33+
import rx.subscriptions.Subscriptions;
34+
35+
/**
36+
* Allows the use of operators for controlling the timing around when actions
37+
* scheduled on workers are actually done. This makes it possible to layer
38+
* additional behavior on an existing {@link Scheduler}. The main parameter is a
39+
* function that flattens an {@link Observable} of {@link Completable}s into
40+
* just one {@link Completable}. There must be a chain of operators connecting
41+
* the returned value to the source {@link Observable} otherwise any work
42+
* scheduled on the {@link Scheduler} will not be executed.
43+
* <p>
44+
* When {@link Scheduler#createWorker()} is invoked a {@link Completable} is
45+
* onNext'd to the combinator to be flattened. If the {@link Completable} is not
46+
* immediately subscribed to an calls to {@link Worker#schedule} are buffered.
47+
* Once the {@link Completable} is subscribed to actions are then scheduled on the
48+
* actual {@link Scheduler}. When the {@link Worker} is unsubscribed the
49+
* {@link Completable} emits an onComplete and triggers any behavior in the
50+
* flattening operator. The {@link Observable} and all {@link Completable}s give
51+
* to the flattening function never onError.
52+
* <p>
53+
* Slowing down the rate to no more than than 1 a second.
54+
*
55+
* <pre>
56+
* Scheduler slowSched = new ScheduleWhen(workers -> {
57+
* // use concatenate to make each worker happen one at a time.
58+
* return Completable.concat(workers.map(worker -> {
59+
* // delay the starting of the next worker by 1 second.
60+
* return worker.delay(1, TimeUnit.SECONDS);
61+
* }));
62+
* }, Schedulers.computation());
63+
* </pre>
64+
* <p>
65+
* Limit the amount concurrency two at a time without creating a new fix size
66+
* thread pool:
67+
*
68+
* <pre>
69+
* Scheduler limitSched = new ScheduleWhen(workers -> {
70+
* // use merge max concurrent to two at a time
71+
* return Completable.merge(workers, 2);
72+
* }, Schedulers.computation());
73+
* </pre>
74+
*/
75+
@Experimental
76+
public class ScheduleWhen extends Scheduler {
77+
private final Scheduler actualScheduler;
78+
private final PublishSubject<Observable<Completable>> workerQueue;
79+
80+
public ScheduleWhen(Func1<Observable<Observable<Completable>>, Completable> combine, Scheduler actualScheduler) {
81+
this.actualScheduler = actualScheduler;
82+
// workers are converted into completables and put in this queue.
83+
this.workerQueue = PublishSubject.create();
84+
// send it to a custom combinator to pick the order and rate at which
85+
// workers are processed.
86+
combine.call(workerQueue.onBackpressureBuffer()).subscribe();
87+
}
88+
89+
@Override
90+
public Worker createWorker() {
91+
final Worker actualWorker = actualScheduler.createWorker();
92+
// a queue for the actions submitted while worker is waiting to get to
93+
// the subscribe to off the workerQueue.
94+
final BufferUntilSubscriber<ScheduledAction> actionQueue = BufferUntilSubscriber.<ScheduledAction>create();
95+
// convert the work of scheduling all the actions into a completable
96+
Observable<Completable> actions = actionQueue.map(new Func1<ScheduledAction, Completable>() {
97+
@Override
98+
public Completable call(final ScheduledAction action) {
99+
return Completable.create(new CompletableOnSubscribe() {
100+
@Override
101+
public void call(CompletableSubscriber actionCompletable) {
102+
actionCompletable.onSubscribe(action);
103+
action.call(actualWorker);
104+
actionCompletable.onCompleted();
105+
}
106+
});
107+
}
108+
});
109+
110+
// a worker that queues the action to the actionQueue subject.
111+
Worker worker = new Worker() {
112+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
113+
114+
@Override
115+
public void unsubscribe() {
116+
// complete the actionQueue when worker is unsubscribed to make
117+
// room for the next worker in the workerQueue.
118+
if (unsubscribed.compareAndSet(false, true)) {
119+
actualWorker.unsubscribe();
120+
synchronized (actionQueue) {
121+
actionQueue.onCompleted();
122+
}
123+
}
124+
}
125+
126+
@Override
127+
public boolean isUnsubscribed() {
128+
return unsubscribed.get();
129+
}
130+
131+
@Override
132+
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
133+
// send a scheduled action to the actionQueue
134+
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
135+
synchronized (actionQueue) {
136+
actionQueue.onNext(delayedAction);
137+
}
138+
return delayedAction;
139+
}
140+
141+
@Override
142+
public Subscription schedule(final Action0 action) {
143+
// send a scheduled action to the actionQueue
144+
ImmediateAction immediateAction = new ImmediateAction(action);
145+
synchronized (actionQueue) {
146+
actionQueue.onNext(immediateAction);
147+
}
148+
return immediateAction;
149+
}
150+
};
151+
152+
// enqueue the completable that process actions put in reply subject
153+
synchronized (workerQueue) {
154+
workerQueue.onNext(actions);
155+
}
156+
157+
// return the worker that adds actions to the reply subject
158+
return worker;
159+
}
160+
161+
private static final Subscription SUBSCRIBED = new Subscription() {
162+
@Override
163+
public void unsubscribe() {
164+
}
165+
166+
@Override
167+
public boolean isUnsubscribed() {
168+
return false;
169+
}
170+
};
171+
172+
private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed();
173+
174+
@SuppressWarnings("serial")
175+
private static abstract class ScheduledAction extends AtomicReference<Subscription> implements Subscription {
176+
public ScheduledAction() {
177+
super(SUBSCRIBED);
178+
}
179+
180+
private final void call(Worker actualWorker) {
181+
Subscription oldState = get();
182+
// either SUBSCRIBED or UNSUBSCRIBED
183+
if (oldState == UNSUBSCRIBED) {
184+
// no need to schedule return
185+
return;
186+
}
187+
if (oldState != SUBSCRIBED) {
188+
// has already been scheduled return
189+
// should not be able to get here but handle it anyway by not
190+
// rescheduling.
191+
return;
192+
}
193+
194+
Subscription newState = callActual(actualWorker);
195+
196+
if (!compareAndSet(SUBSCRIBED, newState)) {
197+
// set would only fail if the new current state is some other
198+
// subscription from a concurrent call to this method.
199+
// Unsubscribe from the action just scheduled because it lost
200+
// the race.
201+
newState.unsubscribe();
202+
}
203+
}
204+
205+
protected abstract Subscription callActual(Worker actualWorker);
206+
207+
@Override
208+
public boolean isUnsubscribed() {
209+
return get().isUnsubscribed();
210+
}
211+
212+
@Override
213+
public void unsubscribe() {
214+
Subscription oldState;
215+
// no matter what the current state is the new state is going to be
216+
Subscription newState = UNSUBSCRIBED;
217+
do {
218+
oldState = get();
219+
if (oldState == UNSUBSCRIBED) {
220+
// the action has already been unsubscribed
221+
return;
222+
}
223+
} while (!compareAndSet(oldState, newState));
224+
225+
if (oldState != SUBSCRIBED) {
226+
// the action was scheduled. stop it.
227+
oldState.unsubscribe();
228+
}
229+
}
230+
}
231+
232+
@SuppressWarnings("serial")
233+
private static class ImmediateAction extends ScheduledAction {
234+
private final Action0 action;
235+
236+
public ImmediateAction(Action0 action) {
237+
this.action = action;
238+
}
239+
240+
@Override
241+
protected Subscription callActual(Worker actualWorker) {
242+
return actualWorker.schedule(action);
243+
}
244+
}
245+
246+
@SuppressWarnings("serial")
247+
private static class DelayedAction extends ScheduledAction {
248+
private final Action0 action;
249+
private final long delayTime;
250+
private final TimeUnit unit;
251+
252+
public DelayedAction(Action0 action, long delayTime, TimeUnit unit) {
253+
this.action = action;
254+
this.delayTime = delayTime;
255+
this.unit = unit;
256+
}
257+
258+
@Override
259+
protected Subscription callActual(Worker actualWorker) {
260+
return actualWorker.schedule(action, delayTime, unit);
261+
}
262+
}
263+
}

0 commit comments

Comments
 (0)