Skip to content

1.x: add optional tracking of worker creation sites + report it on error #3965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions src/main/java/rx/internal/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,16 @@ public void shutdown() {

@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
Throwable site = null;
if (WorkerDebugSupport.isEnabled()) {
site = new RuntimeException("createWorker() called");
}
return new EventLoopWorker(pool.get(), site);
}

private static final class EventLoopWorker extends Scheduler.Worker {
private static final class EventLoopWorker extends Scheduler.Worker
implements WorkerCallback {
final Throwable site;
private final CompositeSubscription innerSubscription = new CompositeSubscription();
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
Expand All @@ -182,8 +188,9 @@ private static final class EventLoopWorker extends Scheduler.Worker {
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");

EventLoopWorker(CachedWorkerPool pool) {
EventLoopWorker(CachedWorkerPool pool, Throwable site) {
this.pool = pool;
this.site = site;
this.threadWorker = pool.get();
}

Expand Down Expand Up @@ -221,18 +228,31 @@ public void call() {
}
action.call();
}
}, delayTime, unit);
innerSubscription.add(s);
s.addParent(innerSubscription);
}, delayTime, unit, this);
return s;
}

@Override
public void add(ScheduledAction action) {
innerSubscription.add(action);
}

@Override
public void remove(ScheduledAction action) {
innerSubscription.remove(action);
}

@Override
public Throwable workerCreationSite() {
return site;
}
}

private static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;

ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
super(threadFactory, null);
this.expirationTime = 0L;
}

Expand Down
57 changes: 47 additions & 10 deletions src/main/java/rx/internal/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final class EventLoopsScheduler extends Scheduler implements SchedulerLif

static final PoolWorker SHUTDOWN_WORKER;
static {
SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE);
SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE, null);
SHUTDOWN_WORKER.unsubscribe();
}

Expand All @@ -60,7 +60,7 @@ static final class FixedSchedulerPool {
this.cores = maxThreads;
this.eventLoops = new PoolWorker[maxThreads];
for (int i = 0; i < maxThreads; i++) {
this.eventLoops[i] = new PoolWorker(threadFactory);
this.eventLoops[i] = new PoolWorker(threadFactory, null);
}
}

Expand Down Expand Up @@ -98,7 +98,11 @@ public EventLoopsScheduler(ThreadFactory threadFactory) {

@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get().getEventLoop());
Throwable site = null;
if (WorkerDebugSupport.isEnabled()) {
site = new RuntimeException("createWorker() called");
}
return new EventLoopWorker(pool.get().getEventLoop(), site);
}

@Override
Expand Down Expand Up @@ -131,18 +135,51 @@ public void shutdown() {
*/
public Subscription scheduleDirect(Action0 action) {
PoolWorker pw = pool.get().getEventLoop();
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS, null);
}

private static class EventLoopWorker extends Scheduler.Worker {
private final SubscriptionList serial = new SubscriptionList();
private final CompositeSubscription timed = new CompositeSubscription();
private final SubscriptionList both = new SubscriptionList(serial, timed);
private final PoolWorker poolWorker;
final WorkerCallback timedCallback;
final WorkerCallback serialCallback;

EventLoopWorker(PoolWorker poolWorker) {
EventLoopWorker(PoolWorker poolWorker, final Throwable site) {
this.poolWorker = poolWorker;

this.timedCallback = new WorkerCallback() {
@Override
public void add(ScheduledAction action) {
timed.add(action);
}

@Override
public void remove(ScheduledAction action) {
timed.remove(action);
}

@Override
public Throwable workerCreationSite() {
return site;
}
};
this.serialCallback = new WorkerCallback() {
@Override
public void add(ScheduledAction action) {
serial.add(action);
}

@Override
public void remove(ScheduledAction action) {
serial.remove(action);
}

@Override
public Throwable workerCreationSite() {
return site;
}
};
}

@Override
Expand All @@ -169,7 +206,7 @@ public void call() {
}
action.call();
}
}, 0, null, serial);
}, 0, null, serialCallback);
}

@Override
Expand All @@ -186,13 +223,13 @@ public void call() {
}
action.call();
}
}, delayTime, unit, timed);
}, delayTime, unit, timedCallback);
}
}

static final class PoolWorker extends NewThreadWorker {
PoolWorker(ThreadFactory threadFactory) {
super(threadFactory);
PoolWorker(ThreadFactory threadFactory, Throwable site) {
super(threadFactory, site);
}
}
}
30 changes: 26 additions & 4 deletions src/main/java/rx/internal/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ public ExecutorScheduler(Executor executor) {

@Override
public Worker createWorker() {
return new ExecutorSchedulerWorker(executor);
Throwable site = null;
if (WorkerDebugSupport.isEnabled()) {
site = new RuntimeException("createWorker() called");
}
return new ExecutorSchedulerWorker(executor, site);
}

/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
static final class ExecutorSchedulerWorker extends Scheduler.Worker
implements Runnable, WorkerCallback {
final Executor executor;
// TODO: use a better performing structure for task tracking
final CompositeSubscription tasks;
Expand All @@ -51,20 +56,23 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R

final ScheduledExecutorService service;

public ExecutorSchedulerWorker(Executor executor) {
final Throwable site;

public ExecutorSchedulerWorker(Executor executor, Throwable site) {
this.executor = executor;
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
this.wip = new AtomicInteger();
this.tasks = new CompositeSubscription();
this.service = GenericScheduledExecutorService.getInstance();
this.site = site;
}

@Override
public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.unsubscribed();
}
ScheduledAction ea = new ScheduledAction(action, tasks);
ScheduledAction ea = new ScheduledAction(action, this);
tasks.add(ea);
queue.offer(ea);
if (wip.getAndIncrement() == 0) {
Expand Down Expand Up @@ -180,5 +188,19 @@ public void unsubscribe() {
queue.clear();
}

@Override
public void add(ScheduledAction action) {
tasks.add(action);
}

@Override
public void remove(ScheduledAction action) {
tasks.remove(action);
}

@Override
public Throwable workerCreationSite() {
return site;
}
}
}
6 changes: 5 additions & 1 deletion src/main/java/rx/internal/schedulers/NewThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public NewThreadScheduler(ThreadFactory threadFactory) {

@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
Throwable site = null;
if (WorkerDebugSupport.isEnabled()) {
site = new RuntimeException("createWorker() called");
}
return new NewThreadWorker(threadFactory, site);
}
}
65 changes: 30 additions & 35 deletions src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
/**
* @warn class description missing
*/
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
public class NewThreadWorker extends Scheduler.Worker implements Subscription, WorkerCallback {
final Throwable site;

private final ScheduledExecutorService executor;
private final RxJavaSchedulersHook schedulersHook;
volatile boolean isUnsubscribed;
Expand Down Expand Up @@ -200,15 +202,16 @@ static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executo
}

/* package */
public NewThreadWorker(ThreadFactory threadFactory) {
public NewThreadWorker(ThreadFactory threadFactory, Throwable site) {
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
boolean cancelSupported = tryEnableCancelPolicy(exec);
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
registerExecutor((ScheduledThreadPoolExecutor)exec);
}
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
executor = exec;
this.site = site;
this.schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
this.executor = exec;
}

@Override
Expand All @@ -221,7 +224,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
if (isUnsubscribed) {
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
return scheduleActual(action, delayTime, unit, this);
}

/**
Expand All @@ -230,25 +233,18 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
* @param action the action to wrap and schedule
* @param delayTime the delay in execution
* @param unit the time unit of the delay
* @param parent the optional parent callback in case the action needs to be tracked
* @return the wrapper ScheduledAction
*/
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, WorkerCallback parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction);
Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
ScheduledAction run;
if (parent != null) {
run = new ScheduledAction(decoratedAction, parent);
parent.add(run);
} else {
f = executor.schedule(run, delayTime, unit);
run = new ScheduledAction(decoratedAction);
}
run.add(f);

return run;
}
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);

Future<?> f;
if (delayTime <= 0) {
Expand All @@ -261,22 +257,6 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
return run;
}

public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
Action0 decoratedAction = schedulersHook.onSchedule(action);
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
parent.add(run);

Future<?> f;
if (delayTime <= 0) {
f = executor.submit(run);
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(f);

return run;
}

@Override
public void unsubscribe() {
isUnsubscribed = true;
Expand All @@ -288,4 +268,19 @@ public void unsubscribe() {
public boolean isUnsubscribed() {
return isUnsubscribed;
}

@Override
public void add(ScheduledAction action) {
// NewThreadWorker doesn't track tasks on its own
}

@Override
public void remove(ScheduledAction action) {
// NewThreadWorker doesn't track tasks on its own
}

@Override
public Throwable workerCreationSite() {
return site;
}
}
Loading