Skip to content

Commit f15a712

Browse files
committed
Ability to create custom schedulers with behavior based on composing operators.
1 parent cec8915 commit f15a712

File tree

2 files changed

+367
-0
lines changed

2 files changed

+367
-0
lines changed
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Completable;
23+
import rx.Observable;
24+
import rx.Scheduler;
25+
import rx.Subscription;
26+
import rx.annotations.Experimental;
27+
import rx.functions.Action0;
28+
import rx.functions.Action1;
29+
import rx.functions.Func1;
30+
import rx.internal.operators.BufferUntilSubscriber;
31+
import rx.subjects.PublishSubject;
32+
import rx.subscriptions.Subscriptions;
33+
34+
/**
35+
* Allows the use of operators for controlling the timing around when actions
36+
* scheduled on workers are actually done. This makes it possible to layer
37+
* additional behavior on an existing {@link Scheduler}. The main parameter is a
38+
* function that flattens an {@link Observable} of {@link Completable}s into
39+
* just one {@link Completable}. There must be a chain of operators connecting
40+
* the returned value to the source {@link Observable} otherwise any work
41+
* scheduled on the {@link Scheduler} will not be executed.
42+
* <p>
43+
* When {@link Scheduler#createWorker()} is invoked a {@link Completable} is
44+
* onNext'd to the combinator to be flattened. If the {@link Completable} is not
45+
* immediately subscribed to an calls to {@link Worker#schedule} are buffered.
46+
* Once the {@link Completable} is subscribed to actions are then scheduled on the
47+
* actual {@link Scheduler}. When the {@link Worker} is unsubscribed the
48+
* {@link Completable} emits an onComplete and triggers any behavior in the
49+
* flattening operator. The {@link Observable} and all {@link Completable}s give
50+
* to the flattening function never onError.
51+
* <p>
52+
* Slowing down the rate to no more than than 1 a second.
53+
*
54+
* <pre>
55+
* Scheduler slowSched = new ScheduleWhen(workers -> {
56+
* // use concatenate to make each worker happen one at a time.
57+
* return Completable.concat(workers.map(worker -> {
58+
* // delay the starting of the next worker by 1 second.
59+
* return worker.delay(1, TimeUnit.SECONDS);
60+
* }));
61+
* }, Schedulers.computation());
62+
* </pre>
63+
* <p>
64+
* Limit the amount concurrency two at a time without creating a new fix size
65+
* thread pool:
66+
*
67+
* <pre>
68+
* Scheduler limitSched = new ScheduleWhen(workers -> {
69+
* // use merge max concurrent to two at a time
70+
* return Completable.merge(workers, 2);
71+
* }, Schedulers.computation());
72+
* </pre>
73+
*/
74+
@Experimental
75+
public class ScheduleWhen extends Scheduler {
76+
private final Scheduler actualScheduler;
77+
private final PublishSubject<Completable> workerQueue;
78+
79+
public ScheduleWhen(Func1<Observable<Completable>, Completable> combine, Scheduler actualScheduler) {
80+
this.actualScheduler = actualScheduler;
81+
// workers are converted into completables and put in this queue.
82+
this.workerQueue = PublishSubject.create();
83+
// send it to a custom combinator to pick the order and rate at which
84+
// workers are processed.
85+
combine.call(workerQueue.onBackpressureBuffer()).subscribe();
86+
}
87+
88+
@Override
89+
public Worker createWorker() {
90+
// a queue for the actions submitted while worker is waiting to get to
91+
// the subscribe to off the workerQueue.
92+
final BufferUntilSubscriber<ScheduledAction> actionQueue = BufferUntilSubscriber.create();
93+
final Worker actualWorker = actualScheduler.createWorker();
94+
95+
// convert the work of scheduling all the actions into a completable
96+
Completable completable = actionQueue.doOnNext(new Action1<ScheduledAction>() {
97+
@Override
98+
public void call(ScheduledAction action) {
99+
action.call(actualWorker);
100+
}
101+
}).toCompletable();
102+
103+
// a worker that queues the action to the actionQueue subject.
104+
Worker worker = new Worker() {
105+
private final AtomicBoolean unsubscribed = new AtomicBoolean();
106+
107+
@Override
108+
public void unsubscribe() {
109+
// complete the actionQueue when worker is unsubscribed to make
110+
// room for the next worker in the workerQueue.
111+
if (unsubscribed.compareAndSet(false, true)) {
112+
actualWorker.unsubscribe();
113+
actionQueue.onCompleted();
114+
}
115+
}
116+
117+
@Override
118+
public boolean isUnsubscribed() {
119+
return unsubscribed.get();
120+
}
121+
122+
@Override
123+
public Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit) {
124+
// send a scheduled action to the actionQueue
125+
DelayedAction delayedAction = new DelayedAction(action, delayTime, unit);
126+
actionQueue.onNext(delayedAction);
127+
return delayedAction;
128+
}
129+
130+
@Override
131+
public Subscription schedule(final Action0 action) {
132+
// send a scheduled action to the actionQueue
133+
ImmediateAction immediateAction = new ImmediateAction(action);
134+
actionQueue.onNext(immediateAction);
135+
return immediateAction;
136+
}
137+
};
138+
139+
// enqueue the completable that process actions put in reply subject
140+
workerQueue.onNext(completable);
141+
142+
// return the worker that adds actions to the reply subject
143+
return worker;
144+
}
145+
146+
private static final Subscription SUBSCRIBED = new Subscription() {
147+
@Override
148+
public void unsubscribe() {
149+
}
150+
151+
@Override
152+
public boolean isUnsubscribed() {
153+
return false;
154+
}
155+
};
156+
157+
private static final Subscription UNSUBSCRIBED = Subscriptions.unsubscribed();
158+
159+
@SuppressWarnings("serial")
160+
private static abstract class ScheduledAction extends AtomicReference<Subscription> implements Subscription {
161+
public ScheduledAction() {
162+
super(SUBSCRIBED);
163+
}
164+
165+
private final void call(Worker actualWorker) {
166+
Subscription oldState = get();
167+
// either SUBSCRIBED or UNSUBSCRIBED
168+
if (oldState == UNSUBSCRIBED) {
169+
// no need to schedule return
170+
return;
171+
}
172+
if (oldState != SUBSCRIBED) {
173+
// has already been scheduled return
174+
// should not be able to get here but handle it anyway by not
175+
// rescheduling.
176+
return;
177+
}
178+
179+
Subscription newState = callActual(actualWorker);
180+
181+
if (!compareAndSet(SUBSCRIBED, newState)) {
182+
// set would only fail if the new current state is some other
183+
// subscription from a concurrent call to this method.
184+
// Unsubscribe from the action just scheduled because it lost
185+
// the race.
186+
newState.unsubscribe();
187+
}
188+
}
189+
190+
protected abstract Subscription callActual(Worker actualWorker);
191+
192+
@Override
193+
public boolean isUnsubscribed() {
194+
return get().isUnsubscribed();
195+
}
196+
197+
@Override
198+
public void unsubscribe() {
199+
Subscription oldState;
200+
// no matter what the current state is the new state is going to be
201+
Subscription newState = UNSUBSCRIBED;
202+
do {
203+
oldState = get();
204+
if (oldState == UNSUBSCRIBED) {
205+
// the action has already been unsubscribed
206+
return;
207+
}
208+
} while (!compareAndSet(oldState, newState));
209+
210+
if (oldState != SUBSCRIBED) {
211+
// the action was scheduled. stop it.
212+
oldState.unsubscribe();
213+
}
214+
}
215+
}
216+
217+
@SuppressWarnings("serial")
218+
private static class ImmediateAction extends ScheduledAction {
219+
private final Action0 action;
220+
221+
public ImmediateAction(Action0 action) {
222+
this.action = action;
223+
}
224+
225+
@Override
226+
protected Subscription callActual(Worker actualWorker) {
227+
return actualWorker.schedule(action);
228+
}
229+
}
230+
231+
@SuppressWarnings("serial")
232+
private static class DelayedAction extends ScheduledAction {
233+
private final Action0 action;
234+
private final long delayTime;
235+
private final TimeUnit unit;
236+
237+
public DelayedAction(Action0 action, long delayTime, TimeUnit unit) {
238+
this.action = action;
239+
this.delayTime = delayTime;
240+
this.unit = unit;
241+
}
242+
243+
@Override
244+
protected Subscription callActual(Worker actualWorker) {
245+
return actualWorker.schedule(action, delayTime, unit);
246+
}
247+
}
248+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.schedulers;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import org.junit.Test;
21+
22+
import rx.Completable;
23+
import rx.Observable;
24+
import rx.Scheduler;
25+
import rx.functions.Func1;
26+
import rx.observers.TestSubscriber;
27+
28+
public class ScheduleWhenTest {
29+
/**
30+
* using the same observable chain but with difference custom schedulers.
31+
*/
32+
private Observable<Long> work(Scheduler sched) {
33+
return Observable.range(1, 5).flatMap(new Func1<Integer, Observable<Long>>() {
34+
@Override
35+
public Observable<Long> call(Integer t) {
36+
return Observable.timer(1, TimeUnit.SECONDS, sched);
37+
}
38+
});
39+
}
40+
41+
@Test
42+
public void testMaxConcurrent() {
43+
TestScheduler tSched = new TestScheduler();
44+
45+
Scheduler sched = new ScheduleWhen(new Func1<Observable<Completable>, Completable>() {
46+
@Override
47+
public Completable call(Observable<Completable> workers) {
48+
return Completable.merge(workers, 2);
49+
}
50+
}, tSched);
51+
52+
TestSubscriber<Long> tSub = TestSubscriber.create();
53+
54+
work(sched).subscribe(tSub);
55+
56+
tSub.assertValueCount(0);
57+
58+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
59+
tSub.assertValueCount(2);
60+
61+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
62+
tSub.assertValueCount(4);
63+
64+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
65+
tSub.assertValueCount(5);
66+
tSub.assertCompleted();
67+
}
68+
69+
@Test
70+
public void testDelaySubscription() {
71+
TestScheduler tSched = new TestScheduler();
72+
73+
Scheduler sched = new ScheduleWhen(new Func1<Observable<Completable>, Completable>() {
74+
@Override
75+
public Completable call(Observable<Completable> workers) {
76+
return Completable.concat(workers.map(new Func1<Completable, Completable>() {
77+
@Override
78+
public Completable call(Completable worker) {
79+
return worker.delay(1, TimeUnit.SECONDS, tSched);
80+
}
81+
}));
82+
}
83+
}, tSched);
84+
85+
TestSubscriber<Long> tSub = TestSubscriber.create();
86+
87+
work(sched).subscribe(tSub);
88+
89+
tSub.assertValueCount(0);
90+
91+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
92+
tSub.assertValueCount(1);
93+
94+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
95+
tSub.assertValueCount(1);
96+
97+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
98+
tSub.assertValueCount(2);
99+
100+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
101+
tSub.assertValueCount(2);
102+
103+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
104+
tSub.assertValueCount(3);
105+
106+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
107+
tSub.assertValueCount(3);
108+
109+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
110+
tSub.assertValueCount(4);
111+
112+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
113+
tSub.assertValueCount(4);
114+
115+
tSched.advanceTimeBy(1, TimeUnit.SECONDS);
116+
tSub.assertValueCount(5);
117+
tSub.assertCompleted();
118+
}
119+
}

0 commit comments

Comments
 (0)