Skip to content

Commit c78f3bc

Browse files
committed
1.x: add optional tracking of worker creation sites + report it on error
1 parent fef520e commit c78f3bc

File tree

10 files changed

+444
-143
lines changed

10 files changed

+444
-143
lines changed

src/main/java/rx/internal/schedulers/CachedThreadScheduler.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,16 @@ public void shutdown() {
170170

171171
@Override
172172
public Worker createWorker() {
173-
return new EventLoopWorker(pool.get());
173+
Throwable site = null;
174+
if (WorkerDebugSupport.isEnabled()) {
175+
site = new RuntimeException("createWorker() called");
176+
}
177+
return new EventLoopWorker(pool.get(), site);
174178
}
175179

176-
private static final class EventLoopWorker extends Scheduler.Worker {
180+
private static final class EventLoopWorker extends Scheduler.Worker
181+
implements WorkerCallback {
182+
final Throwable site;
177183
private final CompositeSubscription innerSubscription = new CompositeSubscription();
178184
private final CachedWorkerPool pool;
179185
private final ThreadWorker threadWorker;
@@ -182,8 +188,9 @@ private static final class EventLoopWorker extends Scheduler.Worker {
182188
static final AtomicIntegerFieldUpdater<EventLoopWorker> ONCE_UPDATER
183189
= AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once");
184190

185-
EventLoopWorker(CachedWorkerPool pool) {
191+
EventLoopWorker(CachedWorkerPool pool, Throwable site) {
186192
this.pool = pool;
193+
this.site = site;
187194
this.threadWorker = pool.get();
188195
}
189196

@@ -221,18 +228,31 @@ public void call() {
221228
}
222229
action.call();
223230
}
224-
}, delayTime, unit);
225-
innerSubscription.add(s);
226-
s.addParent(innerSubscription);
231+
}, delayTime, unit, this);
227232
return s;
228233
}
234+
235+
@Override
236+
public void add(ScheduledAction action) {
237+
innerSubscription.add(action);
238+
}
239+
240+
@Override
241+
public void remove(ScheduledAction action) {
242+
innerSubscription.remove(action);
243+
}
244+
245+
@Override
246+
public Throwable workerCreationSite() {
247+
return site;
248+
}
229249
}
230250

231251
private static final class ThreadWorker extends NewThreadWorker {
232252
private long expirationTime;
233253

234254
ThreadWorker(ThreadFactory threadFactory) {
235-
super(threadFactory);
255+
super(threadFactory, null);
236256
this.expirationTime = 0L;
237257
}
238258

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public final class EventLoopsScheduler extends Scheduler implements SchedulerLif
4545

4646
static final PoolWorker SHUTDOWN_WORKER;
4747
static {
48-
SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE);
48+
SHUTDOWN_WORKER = new PoolWorker(RxThreadFactory.NONE, null);
4949
SHUTDOWN_WORKER.unsubscribe();
5050
}
5151

@@ -60,7 +60,7 @@ static final class FixedSchedulerPool {
6060
this.cores = maxThreads;
6161
this.eventLoops = new PoolWorker[maxThreads];
6262
for (int i = 0; i < maxThreads; i++) {
63-
this.eventLoops[i] = new PoolWorker(threadFactory);
63+
this.eventLoops[i] = new PoolWorker(threadFactory, null);
6464
}
6565
}
6666

@@ -98,7 +98,11 @@ public EventLoopsScheduler(ThreadFactory threadFactory) {
9898

9999
@Override
100100
public Worker createWorker() {
101-
return new EventLoopWorker(pool.get().getEventLoop());
101+
Throwable site = null;
102+
if (WorkerDebugSupport.isEnabled()) {
103+
site = new RuntimeException("createWorker() called");
104+
}
105+
return new EventLoopWorker(pool.get().getEventLoop(), site);
102106
}
103107

104108
@Override
@@ -131,18 +135,51 @@ public void shutdown() {
131135
*/
132136
public Subscription scheduleDirect(Action0 action) {
133137
PoolWorker pw = pool.get().getEventLoop();
134-
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
138+
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS, null);
135139
}
136140

137141
private static class EventLoopWorker extends Scheduler.Worker {
138142
private final SubscriptionList serial = new SubscriptionList();
139143
private final CompositeSubscription timed = new CompositeSubscription();
140144
private final SubscriptionList both = new SubscriptionList(serial, timed);
141145
private final PoolWorker poolWorker;
146+
final WorkerCallback timedCallback;
147+
final WorkerCallback serialCallback;
142148

143-
EventLoopWorker(PoolWorker poolWorker) {
149+
EventLoopWorker(PoolWorker poolWorker, final Throwable site) {
144150
this.poolWorker = poolWorker;
145-
151+
this.timedCallback = new WorkerCallback() {
152+
@Override
153+
public void add(ScheduledAction action) {
154+
timed.add(action);
155+
}
156+
157+
@Override
158+
public void remove(ScheduledAction action) {
159+
timed.remove(action);
160+
}
161+
162+
@Override
163+
public Throwable workerCreationSite() {
164+
return site;
165+
}
166+
};
167+
this.serialCallback = new WorkerCallback() {
168+
@Override
169+
public void add(ScheduledAction action) {
170+
serial.add(action);
171+
}
172+
173+
@Override
174+
public void remove(ScheduledAction action) {
175+
serial.remove(action);
176+
}
177+
178+
@Override
179+
public Throwable workerCreationSite() {
180+
return site;
181+
}
182+
};
146183
}
147184

148185
@Override
@@ -169,7 +206,7 @@ public void call() {
169206
}
170207
action.call();
171208
}
172-
}, 0, null, serial);
209+
}, 0, null, serialCallback);
173210
}
174211

175212
@Override
@@ -186,13 +223,13 @@ public void call() {
186223
}
187224
action.call();
188225
}
189-
}, delayTime, unit, timed);
226+
}, delayTime, unit, timedCallback);
190227
}
191228
}
192229

193230
static final class PoolWorker extends NewThreadWorker {
194-
PoolWorker(ThreadFactory threadFactory) {
195-
super(threadFactory);
231+
PoolWorker(ThreadFactory threadFactory, Throwable site) {
232+
super(threadFactory, site);
196233
}
197234
}
198235
}

src/main/java/rx/internal/schedulers/ExecutorScheduler.java

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,16 @@ public ExecutorScheduler(Executor executor) {
3737

3838
@Override
3939
public Worker createWorker() {
40-
return new ExecutorSchedulerWorker(executor);
40+
Throwable site = null;
41+
if (WorkerDebugSupport.isEnabled()) {
42+
site = new RuntimeException("createWorker() called");
43+
}
44+
return new ExecutorSchedulerWorker(executor, site);
4145
}
4246

4347
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
44-
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
48+
static final class ExecutorSchedulerWorker extends Scheduler.Worker
49+
implements Runnable, WorkerCallback {
4550
final Executor executor;
4651
// TODO: use a better performing structure for task tracking
4752
final CompositeSubscription tasks;
@@ -51,20 +56,23 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
5156

5257
final ScheduledExecutorService service;
5358

54-
public ExecutorSchedulerWorker(Executor executor) {
59+
final Throwable site;
60+
61+
public ExecutorSchedulerWorker(Executor executor, Throwable site) {
5562
this.executor = executor;
5663
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
5764
this.wip = new AtomicInteger();
5865
this.tasks = new CompositeSubscription();
5966
this.service = GenericScheduledExecutorService.getInstance();
67+
this.site = site;
6068
}
6169

6270
@Override
6371
public Subscription schedule(Action0 action) {
6472
if (isUnsubscribed()) {
6573
return Subscriptions.unsubscribed();
6674
}
67-
ScheduledAction ea = new ScheduledAction(action, tasks);
75+
ScheduledAction ea = new ScheduledAction(action, this);
6876
tasks.add(ea);
6977
queue.offer(ea);
7078
if (wip.getAndIncrement() == 0) {
@@ -180,5 +188,19 @@ public void unsubscribe() {
180188
queue.clear();
181189
}
182190

191+
@Override
192+
public void add(ScheduledAction action) {
193+
tasks.add(action);
194+
}
195+
196+
@Override
197+
public void remove(ScheduledAction action) {
198+
tasks.remove(action);
199+
}
200+
201+
@Override
202+
public Throwable workerCreationSite() {
203+
return site;
204+
}
183205
}
184206
}

src/main/java/rx/internal/schedulers/NewThreadScheduler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ public NewThreadScheduler(ThreadFactory threadFactory) {
3030

3131
@Override
3232
public Worker createWorker() {
33-
return new NewThreadWorker(threadFactory);
33+
Throwable site = null;
34+
if (WorkerDebugSupport.isEnabled()) {
35+
site = new RuntimeException("createWorker() called");
36+
}
37+
return new NewThreadWorker(threadFactory, site);
3438
}
3539
}

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232
/**
3333
* @warn class description missing
3434
*/
35-
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
35+
public class NewThreadWorker extends Scheduler.Worker implements Subscription, WorkerCallback {
36+
final Throwable site;
37+
3638
private final ScheduledExecutorService executor;
3739
private final RxJavaSchedulersHook schedulersHook;
3840
volatile boolean isUnsubscribed;
@@ -200,15 +202,16 @@ static Method findSetRemoveOnCancelPolicyMethod(ScheduledExecutorService executo
200202
}
201203

202204
/* package */
203-
public NewThreadWorker(ThreadFactory threadFactory) {
205+
public NewThreadWorker(ThreadFactory threadFactory, Throwable site) {
204206
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory);
205207
// Java 7+: cancelled future tasks can be removed from the executor thus avoiding memory leak
206208
boolean cancelSupported = tryEnableCancelPolicy(exec);
207209
if (!cancelSupported && exec instanceof ScheduledThreadPoolExecutor) {
208210
registerExecutor((ScheduledThreadPoolExecutor)exec);
209211
}
210-
schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
211-
executor = exec;
212+
this.site = site;
213+
this.schedulersHook = RxJavaPlugins.getInstance().getSchedulersHook();
214+
this.executor = exec;
212215
}
213216

214217
@Override
@@ -221,7 +224,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
221224
if (isUnsubscribed) {
222225
return Subscriptions.unsubscribed();
223226
}
224-
return scheduleActual(action, delayTime, unit);
227+
return scheduleActual(action, delayTime, unit, this);
225228
}
226229

227230
/**
@@ -230,25 +233,18 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
230233
* @param action the action to wrap and schedule
231234
* @param delayTime the delay in execution
232235
* @param unit the time unit of the delay
236+
* @param parent the optional parent callback in case the action needs to be tracked
233237
* @return the wrapper ScheduledAction
234238
*/
235-
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
239+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, WorkerCallback parent) {
236240
Action0 decoratedAction = schedulersHook.onSchedule(action);
237-
ScheduledAction run = new ScheduledAction(decoratedAction);
238-
Future<?> f;
239-
if (delayTime <= 0) {
240-
f = executor.submit(run);
241+
ScheduledAction run;
242+
if (parent != null) {
243+
run = new ScheduledAction(decoratedAction, parent);
244+
parent.add(run);
241245
} else {
242-
f = executor.schedule(run, delayTime, unit);
246+
run = new ScheduledAction(decoratedAction);
243247
}
244-
run.add(f);
245-
246-
return run;
247-
}
248-
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
249-
Action0 decoratedAction = schedulersHook.onSchedule(action);
250-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
251-
parent.add(run);
252248

253249
Future<?> f;
254250
if (delayTime <= 0) {
@@ -261,22 +257,6 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
261257
return run;
262258
}
263259

264-
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
265-
Action0 decoratedAction = schedulersHook.onSchedule(action);
266-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
267-
parent.add(run);
268-
269-
Future<?> f;
270-
if (delayTime <= 0) {
271-
f = executor.submit(run);
272-
} else {
273-
f = executor.schedule(run, delayTime, unit);
274-
}
275-
run.add(f);
276-
277-
return run;
278-
}
279-
280260
@Override
281261
public void unsubscribe() {
282262
isUnsubscribed = true;
@@ -288,4 +268,19 @@ public void unsubscribe() {
288268
public boolean isUnsubscribed() {
289269
return isUnsubscribed;
290270
}
271+
272+
@Override
273+
public void add(ScheduledAction action) {
274+
// NewThreadWorker doesn't track tasks on its own
275+
}
276+
277+
@Override
278+
public void remove(ScheduledAction action) {
279+
// NewThreadWorker doesn't track tasks on its own
280+
}
281+
282+
@Override
283+
public Throwable workerCreationSite() {
284+
return site;
285+
}
291286
}

0 commit comments

Comments
 (0)