Skip to content

Commit 8614754

Browse files
Merge pull request #2767 from akarnokd/ScalarScheduleOn
Optimized scalar observeOn/subscribeOn
2 parents 4778d00 + cea2d9d commit 8614754

File tree

4 files changed

+90
-7
lines changed

4 files changed

+90
-7
lines changed

src/main/java/rx/Observable.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5175,6 +5175,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
51755175
* @see #subscribeOn
51765176
*/
51775177
public final Observable<T> observeOn(Scheduler scheduler) {
5178+
if (this instanceof ScalarSynchronousObservable) {
5179+
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
5180+
}
51785181
return lift(new OperatorObserveOn<T>(scheduler));
51795182
}
51805183

@@ -7597,6 +7600,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
75977600
* @see #observeOn
75987601
*/
75997602
public final Observable<T> subscribeOn(Scheduler scheduler) {
7603+
if (this instanceof ScalarSynchronousObservable) {
7604+
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
7605+
}
76007606
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
76017607
}
76027608

src/main/java/rx/schedulers/EventLoopsScheduler.java renamed to src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,19 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package rx.schedulers;
16+
package rx.internal.schedulers;
1717

1818
import rx.Scheduler;
1919
import rx.Subscription;
2020
import rx.functions.Action0;
21-
import rx.internal.schedulers.NewThreadWorker;
22-
import rx.internal.schedulers.ScheduledAction;
2321
import rx.internal.util.RxThreadFactory;
2422
import rx.subscriptions.CompositeSubscription;
2523
import rx.subscriptions.Subscriptions;
2624

2725
import java.util.concurrent.ThreadFactory;
2826
import java.util.concurrent.TimeUnit;
2927

30-
/* package */class EventLoopsScheduler extends Scheduler {
28+
public class EventLoopsScheduler extends Scheduler {
3129
/** Manages a fixed number of workers. */
3230
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
3331
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
@@ -76,14 +74,25 @@ public PoolWorker getEventLoop() {
7674
* Create a scheduler with pool size equal to the available processor
7775
* count and using least-recent worker selection policy.
7876
*/
79-
EventLoopsScheduler() {
77+
public EventLoopsScheduler() {
8078
pool = new FixedSchedulerPool();
8179
}
8280

8381
@Override
8482
public Worker createWorker() {
8583
return new EventLoopWorker(pool.getEventLoop());
8684
}
85+
86+
/**
87+
* Schedules the action directly on one of the event loop workers
88+
* without the additional infrastructure and checking.
89+
* @param action the action to schedule
90+
* @return the subscription
91+
*/
92+
public Subscription scheduleDirect(Action0 action) {
93+
PoolWorker pw = pool.getEventLoop();
94+
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
95+
}
8796

8897
private static class EventLoopWorker extends Scheduler.Worker {
8998
private final CompositeSubscription innerSubscription = new CompositeSubscription();

src/main/java/rx/internal/util/ScalarSynchronousObservable.java

Lines changed: 69 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package rx.internal.util;
1717

18-
import rx.Observable;
19-
import rx.Subscriber;
18+
import rx.*;
19+
import rx.Scheduler.Worker;
20+
import rx.functions.Action0;
21+
import rx.internal.schedulers.EventLoopsScheduler;
2022

2123
public final class ScalarSynchronousObservable<T> extends Observable<T> {
2224

@@ -49,5 +51,70 @@ public void call(Subscriber<? super T> s) {
4951
public T get() {
5052
return t;
5153
}
54+
/**
55+
* Customized observeOn/subscribeOn implementation which emits the scalar
56+
* value directly or with less overhead on the specified scheduler.
57+
* @param scheduler the target scheduler
58+
* @return the new observable
59+
*/
60+
public Observable<T> scalarScheduleOn(Scheduler scheduler) {
61+
if (scheduler instanceof EventLoopsScheduler) {
62+
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
63+
return create(new DirectScheduledEmission<T>(es, t));
64+
}
65+
return create(new NormalScheduledEmission<T>(scheduler, t));
66+
}
67+
68+
/** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */
69+
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
70+
private final EventLoopsScheduler es;
71+
private final T value;
72+
DirectScheduledEmission(EventLoopsScheduler es, T value) {
73+
this.es = es;
74+
this.value = value;
75+
}
76+
@Override
77+
public void call(final Subscriber<? super T> child) {
78+
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
79+
}
80+
}
81+
/** Emits a scalar value on a general scheduler. */
82+
static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
83+
private final Scheduler scheduler;
84+
private final T value;
85+
86+
NormalScheduledEmission(Scheduler scheduler, T value) {
87+
this.scheduler = scheduler;
88+
this.value = value;
89+
}
90+
91+
@Override
92+
public void call(final Subscriber<? super T> subscriber) {
93+
Worker worker = scheduler.createWorker();
94+
subscriber.add(worker);
95+
worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
96+
}
97+
}
98+
/** Action that emits a single value when called. */
99+
static final class ScalarSynchronousAction<T> implements Action0 {
100+
private final Subscriber<? super T> subscriber;
101+
private final T value;
52102

103+
private ScalarSynchronousAction(Subscriber<? super T> subscriber,
104+
T value) {
105+
this.subscriber = subscriber;
106+
this.value = value;
107+
}
108+
109+
@Override
110+
public void call() {
111+
try {
112+
subscriber.onNext(value);
113+
} catch (Throwable t) {
114+
subscriber.onError(t);
115+
return;
116+
}
117+
subscriber.onCompleted();
118+
}
119+
}
53120
}

src/main/java/rx/schedulers/Schedulers.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.schedulers;
1717

1818
import rx.Scheduler;
19+
import rx.internal.schedulers.EventLoopsScheduler;
1920
import rx.plugins.RxJavaPlugins;
2021

2122
import java.util.concurrent.Executor;

0 commit comments

Comments
 (0)