-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Optimized observeOn/subscribeOn #2603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,11 +27,28 @@ | |
import java.util.concurrent.ThreadFactory; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/* package */class EventLoopsScheduler extends Scheduler { | ||
public class EventLoopsScheduler extends Scheduler { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes this part of the public API. Why? For example: http://reactivex.io/RxJava/javadoc/rx/schedulers/package-frame.html There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the type needs to be accessible from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But since EventLoopsScheduler was package private, it could be moved into |
||
/** Manages a fixed number of workers. */ | ||
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-"; | ||
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX); | ||
|
||
/** | ||
* Key to setting the maximum number of computation scheduler threads. | ||
* Zero or less is interpreted as use available. Capped by available. | ||
*/ | ||
static final String KEY_MAX_THREADS = "rx.scheduler.max-computation-threads"; | ||
/** The maximum number of computation scheduler threads. */ | ||
static final int MAX_THREADS; | ||
static { | ||
int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0); | ||
int ncpu = Runtime.getRuntime().availableProcessors(); | ||
int max; | ||
if (maxThreads <= 0 || maxThreads > ncpu) { | ||
max = ncpu; | ||
} else { | ||
max = maxThreads; | ||
} | ||
MAX_THREADS = max; | ||
} | ||
static final class FixedSchedulerPool { | ||
final int cores; | ||
|
||
|
@@ -40,7 +57,7 @@ static final class FixedSchedulerPool { | |
|
||
FixedSchedulerPool() { | ||
// initialize event loops | ||
this.cores = Runtime.getRuntime().availableProcessors(); | ||
this.cores = MAX_THREADS; | ||
this.eventLoops = new PoolWorker[cores]; | ||
for (int i = 0; i < cores; i++) { | ||
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY); | ||
|
@@ -68,6 +85,17 @@ public Worker createWorker() { | |
return new EventLoopWorker(pool.getEventLoop()); | ||
} | ||
|
||
/** | ||
* Schedules the action directly on one of the event loop workers | ||
* without the additional infrastructure and checking. | ||
* @param action the action to schedule | ||
* @return the subscription | ||
*/ | ||
public Subscription scheduleDirect(Action0 action) { | ||
PoolWorker pw = pool.getEventLoop(); | ||
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS); | ||
} | ||
|
||
private static class EventLoopWorker extends Scheduler.Worker { | ||
private final CompositeSubscription innerSubscription = new CompositeSubscription(); | ||
private final PoolWorker poolWorker; | ||
|
@@ -110,4 +138,4 @@ private static final class PoolWorker extends NewThreadWorker { | |
super(threadFactory); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, what benefit does this give to assign this reference? Why use
counter
instead ofCOUNTER_UPDATER
directly?Same for the other assignments here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Due to the volatile access in the loop, the JIT optimization that would hoist them into registers is not allowed (i.e., field access couldn't be moved before a volatile read) and would just re-read them all the time.