Skip to content

Optimized observeOn/subscribeOn #2603

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5036,6 +5036,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
* @see #subscribeOn
*/
public final Observable<T> observeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return lift(new OperatorObserveOn<T>(scheduler));
}

Expand Down Expand Up @@ -7455,6 +7458,9 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
* @see #observeOn
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

Expand Down
48 changes: 32 additions & 16 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {

/** Observe through individual queue per observer. */
private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
private final class PollQueueAction implements Action0 {
@Override
public void call() {
pollQueue();
}
}

final Subscriber<? super T> child;
private final Scheduler.Worker recursiveScheduler;
private final ScheduledUnsubscribe scheduledUnsubscribe;
Expand All @@ -81,6 +88,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> {
volatile long counter;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ObserveOnSubscriber> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter");
private Action0 pollQueueAction;

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
Expand Down Expand Up @@ -148,31 +156,39 @@ public void onError(final Throwable e) {

protected void schedule() {
if (COUNTER_UPDATER.getAndIncrement(this) == 0) {
recursiveScheduler.schedule(new Action0() {

@Override
public void call() {
pollQueue();
}

});
Action0 a = pollQueueAction;
if (a == null) {
a = new PollQueueAction();
pollQueueAction = a;
}
recursiveScheduler.schedule(a);
}
}

// only execute this from schedule()
private void pollQueue() {
int emitted = 0;

final ScheduledUnsubscribe u = scheduledUnsubscribe;
final RxRingBuffer q = queue;
final Subscriber<? super T> child = this.child;
final NotificationLite<T> on = this.on;
@SuppressWarnings("rawtypes")
final AtomicLongFieldUpdater<ObserveOnSubscriber> counter = COUNTER_UPDATER;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, what benefit does this give to assign this reference? Why use counter instead of COUNTER_UPDATER directly?

Same for the other assignments here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the volatile access in the loop, the JIT optimization that would hoist them into registers is not allowed (i.e., field access couldn't be moved before a volatile read) and would just re-read them all the time.

@SuppressWarnings("rawtypes")
final AtomicLongFieldUpdater<ObserveOnSubscriber> req = REQUESTED;

do {
/*
* Set to 1 otherwise it could have grown very large while in the last poll loop
* and then we can end up looping all those times again here before exiting even once we've drained
*/
COUNTER_UPDATER.set(this, 1);
counter.lazySet(this, 1);

while (!scheduledUnsubscribe.isUnsubscribed()) {
while (!u.isUnsubscribed()) {
if (failure) {
// special handling to short-circuit an error propagation
Object o = queue.poll();
Object o = q.poll();
// completed so we will skip onNext if they exist and only emit terminal events
if (on.isError(o)) {
// only emit error
Expand All @@ -181,11 +197,11 @@ private void pollQueue() {
return;
}
} else {
if (REQUESTED.getAndDecrement(this) != 0) {
Object o = queue.poll();
if (req.getAndDecrement(this) != 0) {
Object o = q.poll();
if (o == null) {
// nothing in queue
REQUESTED.incrementAndGet(this);
req.incrementAndGet(this);
break;
} else {
if (!on.accept(child, o)) {
Expand All @@ -195,12 +211,12 @@ private void pollQueue() {
}
} else {
// we hit the end ... so increment back to 0 again
REQUESTED.incrementAndGet(this);
req.incrementAndGet(this);
break;
}
}
}
} while (COUNTER_UPDATER.decrementAndGet(this) > 0);
} while (counter.decrementAndGet(this) > 0);

// request the number of items that we emitted in this poll loop
if (emitted > 0) {
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
package rx.internal.schedulers;

import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.*;

import rx.Subscription;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.internal.util.SubscriptionList;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.CompositeSubscription;

Expand All @@ -32,12 +32,12 @@
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final CompositeSubscription cancel;
final SubscriptionList cancel;
final Action0 action;

public ScheduledAction(Action0 action) {
this.action = action;
this.cancel = new CompositeSubscription();
this.cancel = new SubscriptionList();
}

@Override
Expand Down
70 changes: 70 additions & 0 deletions src/main/java/rx/internal/util/ScalarSynchronousObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
package rx.internal.util;

import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.schedulers.EventLoopsScheduler;

public final class ScalarSynchronousObservable<T> extends Observable<T> {

Expand Down Expand Up @@ -50,4 +54,70 @@ public T get() {
return t;
}

/**
* Customized observeOn/subscribeOn implementation which emits the scalar
* value directly or with less overhead on the specified scheduler.
* @param scheduler the target scheduler
* @return the new observable
*/
public Observable<T> scalarScheduleOn(Scheduler scheduler) {
if (scheduler instanceof EventLoopsScheduler) {
EventLoopsScheduler es = (EventLoopsScheduler) scheduler;
return create(new DirectScheduledEmission<T>(es, t));
}
return create(new NormalScheduledEmission<T>(scheduler, t));
}

/** Optimized observeOn for scalar value observed on the EventLoopsScheduler. */
static final class DirectScheduledEmission<T> implements OnSubscribe<T> {
private final EventLoopsScheduler es;
private final T value;
DirectScheduledEmission(EventLoopsScheduler es, T value) {
this.es = es;
this.value = value;
}
@Override
public void call(final Subscriber<? super T> child) {
child.add(es.scheduleDirect(new ScalarSynchronousAction<T>(child, value)));
}
}
/** Emits a scalar value on a general scheduler. */
static final class NormalScheduledEmission<T> implements OnSubscribe<T> {
private final Scheduler scheduler;
private final T value;

NormalScheduledEmission(Scheduler scheduler, T value) {
this.scheduler = scheduler;
this.value = value;
}

@Override
public void call(final Subscriber<? super T> subscriber) {
Worker worker = scheduler.createWorker();
subscriber.add(worker);
worker.schedule(new ScalarSynchronousAction<T>(subscriber, value));
}
}
/** Action that emits a single value when called. */
private static final class ScalarSynchronousAction<T> implements Action0 {
private final Subscriber<? super T> subscriber;
private final T value;

private ScalarSynchronousAction(Subscriber<? super T> subscriber,
T value) {
this.subscriber = subscriber;
this.value = value;
}

@Override
public void call() {
try {
subscriber.onNext(value);
} catch (Throwable t) {
subscriber.onError(t);
return;
}
subscriber.onCompleted();
}
}
}
45 changes: 22 additions & 23 deletions src/main/java/rx/internal/util/SubscriptionList.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
public final class SubscriptionList implements Subscription {

private List<Subscription> subscriptions;
private boolean unsubscribed = false;
private volatile boolean unsubscribed = false;

public SubscriptionList() {
}
Expand All @@ -42,7 +42,7 @@ public SubscriptionList(final Subscription... subscriptions) {
}

@Override
public synchronized boolean isUnsubscribed() {
public boolean isUnsubscribed() {
return unsubscribed;
}

Expand All @@ -55,21 +55,18 @@ public synchronized boolean isUnsubscribed() {
* the {@link Subscription} to add
*/
public void add(final Subscription s) {
Subscription unsubscribe = null;
synchronized (this) {
if (unsubscribed) {
unsubscribe = s;
} else {
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
if (!unsubscribed) {
synchronized (this) {
if (!unsubscribed) {
if (subscriptions == null) {
subscriptions = new LinkedList<Subscription>();
}
subscriptions.add(s);
return;
}
subscriptions.add(s);
}
}
if (unsubscribe != null) {
// call after leaving the synchronized block so we're not holding a lock while executing this
unsubscribe.unsubscribe();
}
s.unsubscribe();
}

/**
Expand All @@ -78,17 +75,19 @@ public void add(final Subscription s) {
*/
@Override
public void unsubscribe() {
List<Subscription> list;
synchronized (this) {
if (unsubscribed) {
return;
if (!unsubscribed) {
List<Subscription> list;
synchronized (this) {
if (unsubscribed) {
return;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
}
unsubscribed = true;
list = subscriptions;
subscriptions = null;
// we will only get here once
unsubscribeFromAll(list);
}
// we will only get here once
unsubscribeFromAll(list);
}

private static void unsubscribeFromAll(Collection<Subscription> subscriptions) {
Expand Down
36 changes: 32 additions & 4 deletions src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,28 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/* package */class EventLoopsScheduler extends Scheduler {
public class EventLoopsScheduler extends Scheduler {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes this part of the public API. Why?

For example: http://reactivex.io/RxJava/javadoc/rx/schedulers/package-frame.html

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the type needs to be accessible from rx.internal.util.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But since EventLoopsScheduler was package private, it could be moved into rx.internal.schedulers and be hidden from the public docs.

/** Manages a fixed number of workers. */
private static final String THREAD_NAME_PREFIX = "RxComputationThreadPool-";
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);

/**
* Key to setting the maximum number of computation scheduler threads.
* Zero or less is interpreted as use available. Capped by available.
*/
static final String KEY_MAX_THREADS = "rx.scheduler.max-computation-threads";
/** The maximum number of computation scheduler threads. */
static final int MAX_THREADS;
static {
int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0);
int ncpu = Runtime.getRuntime().availableProcessors();
int max;
if (maxThreads <= 0 || maxThreads > ncpu) {
max = ncpu;
} else {
max = maxThreads;
}
MAX_THREADS = max;
}
static final class FixedSchedulerPool {
final int cores;

Expand All @@ -40,7 +57,7 @@ static final class FixedSchedulerPool {

FixedSchedulerPool() {
// initialize event loops
this.cores = Runtime.getRuntime().availableProcessors();
this.cores = MAX_THREADS;
this.eventLoops = new PoolWorker[cores];
for (int i = 0; i < cores; i++) {
this.eventLoops[i] = new PoolWorker(THREAD_FACTORY);
Expand Down Expand Up @@ -68,6 +85,17 @@ public Worker createWorker() {
return new EventLoopWorker(pool.getEventLoop());
}

/**
* Schedules the action directly on one of the event loop workers
* without the additional infrastructure and checking.
* @param action the action to schedule
* @return the subscription
*/
public Subscription scheduleDirect(Action0 action) {
PoolWorker pw = pool.getEventLoop();
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
}

private static class EventLoopWorker extends Scheduler.Worker {
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final PoolWorker poolWorker;
Expand Down Expand Up @@ -110,4 +138,4 @@ private static final class PoolWorker extends NewThreadWorker {
super(threadFactory);
}
}
}
}
Loading