-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Optimized observeOn/subscribeOn #2603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,11 @@ | |
package rx.internal.util; | ||
|
||
import rx.Observable; | ||
import rx.Scheduler; | ||
import rx.Subscriber; | ||
import rx.Scheduler.Worker; | ||
import rx.functions.Action0; | ||
import rx.schedulers.EventLoopsScheduler; | ||
|
||
public final class ScalarSynchronousObservable<T> extends Observable<T> { | ||
|
||
|
@@ -49,5 +53,81 @@ public void call(Subscriber<? super T> s) { | |
public T get() { | ||
return t; | ||
} | ||
|
||
// @Override | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are these commented out methods intended to be here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've experimented with disabling the |
||
// public Observable<T> subscribeOn(Scheduler scheduler) { | ||
// return scalarScheduleOn(scheduler); | ||
// } | ||
// | ||
// @Override | ||
// public Observable<T> observeOn(Scheduler scheduler) { | ||
// return scalarScheduleOn(scheduler); | ||
// } | ||
|
||
/** | ||
* Customized observeOn/subscribeOn implementation which emits the scalar | ||
* value directly or with less overhead on the specified scheduler. | ||
* @param scheduler the target scheduler | ||
* @return the new observable | ||
*/ | ||
public Observable<T> scalarScheduleOn(Scheduler scheduler) { | ||
if (scheduler instanceof EventLoopsScheduler) { | ||
EventLoopsScheduler es = (EventLoopsScheduler) scheduler; | ||
return create(new DirectScheduledEmission<T>(es, t)); | ||
} | ||
return create(new NormalScheduledEmission<T>(scheduler, t)); | ||
} | ||
|
||
/** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */ | ||
static final class DirectScheduledEmission<T> implements OnSubscribe<T> { | ||
private final EventLoopsScheduler es; | ||
private final T value; | ||
DirectScheduledEmission(EventLoopsScheduler es, T value) { | ||
this.es = es; | ||
this.value = value; | ||
} | ||
@Override | ||
public void call(final Subscriber<? super T> child) { | ||
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value))); | ||
} | ||
} | ||
/** Emits a scalar value on a general scheduler. */ | ||
static final class NormalScheduledEmission<T> implements OnSubscribe<T> { | ||
private final Scheduler scheduler; | ||
private final T value; | ||
|
||
NormalScheduledEmission(Scheduler scheduler, T value) { | ||
this.scheduler = scheduler; | ||
this.value = value; | ||
} | ||
|
||
@Override | ||
public void call(final Subscriber<? super T> subscriber) { | ||
Worker worker = scheduler.createWorker(); | ||
subscriber.add(worker); | ||
worker.schedule(new ScalarSynchronousAction<T>(subscriber, value)); | ||
} | ||
} | ||
/** Action that emits a single value when called. */ | ||
private static final class ScalarSynchronousAction<T> implements Action0 { | ||
private final Subscriber<? super T> subscriber; | ||
private final T value; | ||
|
||
private ScalarSynchronousAction(Subscriber<? super T> subscriber, | ||
T value) { | ||
this.subscriber = subscriber; | ||
this.value = value; | ||
} | ||
|
||
@Override | ||
public void call() { | ||
try { | ||
subscriber.onNext(value); | ||
} catch (Throwable t) { | ||
subscriber.onError(t); | ||
return; | ||
} | ||
subscriber.onCompleted(); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ | |
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/* package */class EventLoopsScheduler extends Scheduler { | ||
public class EventLoopsScheduler extends Scheduler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes this part of the public API. Why? For example: http://reactivex.io/RxJava/javadoc/rx/schedulers/package-frame.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the type needs to be accessible from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But since EventLoopsScheduler was package private, it could be moved into |
||
/** Manages a fixed number of workers. */ | ||
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-"; | ||
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); | ||
|
@@ -67,6 +67,17 @@ public PoolWorker getEventLoop() { | |
public Worker createWorker() { | ||
return new EventLoopWorker(pool.getEventLoop()); | ||
} | ||
|
||
/** | ||
* Schedules the action directly on one of the event loop workers | ||
* without the additional infrastructure and checking. | ||
* @param action the action to schedule | ||
* @return the subscription | ||
*/ | ||
public Subscription scheduleDirect(Action0 action) { | ||
PoolWorker pw = pool.getEventLoop(); | ||
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS); | ||
} | ||
|
||
private static class EventLoopWorker extends Scheduler.Worker { | ||
private final CompositeSubscription innerSubscription = new CompositeSubscription(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, what benefit does this give to assign this reference? Why use
counter
instead ofCOUNTER_UPDATER
directly?Same for the other assignments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the volatile access in the loop, the JIT optimization that would hoist them into registers is not allowed (i.e., field access couldn't be moved before a volatile read) and would just re-read them all the time.