Skip to content

Commit 671eba5

Browse files
committed
Change the workers to capture the stack trace for all subsequent scheduled actions to increase the readability of uncaught and fatal exceptions that bubble up to the schedulers.
1 parent d43c05c commit 671eba5

File tree

7 files changed

+119
-9
lines changed

7 files changed

+119
-9
lines changed

src/main/java/rx/internal/schedulers/CachedThreadScheduler.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ ThreadWorker get() {
7878
while (!expiringWorkerQueue.isEmpty()) {
7979
ThreadWorker threadWorker = expiringWorkerQueue.poll();
8080
if (threadWorker != null) {
81+
threadWorker.resetContext();
8182
return threadWorker;
8283
}
8384
}

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private static class EventLoopWorker extends Scheduler.Worker {
141141

142142
EventLoopWorker(PoolWorker poolWorker) {
143143
this.poolWorker = poolWorker;
144-
144+
poolWorker.resetContext();
145145
}
146146

147147
@Override

src/main/java/rx/internal/schedulers/ExecutorScheduler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public Worker createWorker() {
4646

4747
/** Worker that schedules tasks on the executor indirectly through a trampoline mechanism. */
4848
static final class ExecutorSchedulerWorker extends Scheduler.Worker implements Runnable {
49+
private final Throwable creationContext = SchedulerContextException.create();
4950
final Executor executor;
5051
// TODO: use a better performing structure for task tracking
5152
final CompositeSubscription tasks;
@@ -68,7 +69,7 @@ public Subscription schedule(Action0 action) {
6869
if (isUnsubscribed()) {
6970
return Subscriptions.unsubscribed();
7071
}
71-
ScheduledAction ea = new ScheduledAction(action, tasks);
72+
ScheduledAction ea = new ScheduledAction(action, tasks, creationContext);
7273
tasks.add(ea);
7374
queue.offer(ea);
7475
if (wip.getAndIncrement() == 0) {
@@ -150,7 +151,7 @@ public void call() {
150151
((ScheduledAction)s2).add(removeMas);
151152
}
152153
}
153-
});
154+
}, this.creationContext);
154155
// This will make sure if ea.call() gets executed before this line
155156
// we don't override the current task in mas.
156157
first.set(ea);

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
* @warn class description missing
3434
*/
3535
public class NewThreadWorker extends Scheduler.Worker implements Subscription {
36+
private Throwable creationContext = SchedulerContextException.create();
3637
private final ScheduledExecutorService executor;
3738
private final RxJavaSchedulersHook schedulersHook;
3839
volatile boolean isUnsubscribed;
@@ -233,7 +234,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
233234
*/
234235
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
235236
Action0 decoratedAction = schedulersHook.onSchedule(action);
236-
ScheduledAction run = new ScheduledAction(decoratedAction);
237+
ScheduledAction run = new ScheduledAction(decoratedAction, creationContext);
237238
Future<?> f;
238239
if (delayTime <= 0) {
239240
f = executor.submit(run);
@@ -246,7 +247,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
246247
}
247248
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, CompositeSubscription parent) {
248249
Action0 decoratedAction = schedulersHook.onSchedule(action);
249-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
250+
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
250251
parent.add(run);
251252

252253
Future<?> f;
@@ -262,7 +263,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
262263

263264
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, SubscriptionList parent) {
264265
Action0 decoratedAction = schedulersHook.onSchedule(action);
265-
ScheduledAction run = new ScheduledAction(decoratedAction, parent);
266+
ScheduledAction run = new ScheduledAction(decoratedAction, parent, creationContext);
266267
parent.add(run);
267268

268269
Future<?> f;
@@ -287,4 +288,8 @@ public void unsubscribe() {
287288
public boolean isUnsubscribed() {
288289
return isUnsubscribed;
289290
}
291+
292+
public void resetContext() {
293+
creationContext = SchedulerContextException.create();
294+
}
290295
}

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.*;
2020

2121
import rx.Subscription;
22+
import rx.exceptions.Exceptions;
2223
import rx.exceptions.OnErrorNotImplementedException;
2324
import rx.functions.Action0;
2425
import rx.internal.util.SubscriptionList;
@@ -34,18 +35,22 @@ public final class ScheduledAction extends AtomicReference<Thread> implements Ru
3435
private static final long serialVersionUID = -3962399486978279857L;
3536
final SubscriptionList cancel;
3637
final Action0 action;
38+
final Throwable creationContext;
3739

38-
public ScheduledAction(Action0 action) {
40+
public ScheduledAction(Action0 action, Throwable creationContext) {
3941
this.action = action;
4042
this.cancel = new SubscriptionList();
43+
this.creationContext = creationContext;
4144
}
42-
public ScheduledAction(Action0 action, CompositeSubscription parent) {
45+
public ScheduledAction(Action0 action, CompositeSubscription parent, Throwable creationContext) {
4346
this.action = action;
4447
this.cancel = new SubscriptionList(new Remover(this, parent));
48+
this.creationContext = creationContext;
4549
}
46-
public ScheduledAction(Action0 action, SubscriptionList parent) {
50+
public ScheduledAction(Action0 action, SubscriptionList parent, Throwable creationContext) {
4751
this.action = action;
4852
this.cancel = new SubscriptionList(new Remover2(this, parent));
53+
this.creationContext = creationContext;
4954
}
5055

5156
@Override
@@ -61,6 +66,7 @@ public void run() {
6166
} else {
6267
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
6368
}
69+
Exceptions.addCause(ie, creationContext);
6470
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
6571
Thread thread = Thread.currentThread();
6672
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.schedulers;
17+
18+
/**
19+
* Used only for providing context around where work was scheduled should an error occur in a different thread.
20+
*/
21+
public class SchedulerContextException extends Exception {
22+
/**
23+
* Constant to use when disabled
24+
*/
25+
private static final Throwable CONTEXT_MISSING = new SchedulerContextException("Missing context. Enable by setting the system property \"rxjava.captureSchedulerContext=true\"");
26+
27+
static {
28+
CONTEXT_MISSING.setStackTrace(new StackTraceElement[0]);
29+
}
30+
31+
/**
32+
* @return a {@link Throwable} that captures the stack trace or a {@link Throwable} that documents how to enable the feature if needed.
33+
*/
34+
public static Throwable create() {
35+
String def = "false";
36+
String setTo = System.getProperty("rxjava.captureSchedulerContext", def);
37+
return setTo != def && "true".equals(setTo) ? new SchedulerContextException("Asynchronous work scheduled at") : CONTEXT_MISSING;
38+
}
39+
40+
private SchedulerContextException(String message) {
41+
super(message);
42+
}
43+
44+
private static final long serialVersionUID = 1L;
45+
}

src/test/java/rx/schedulers/AbstractSchedulerTests.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import rx.functions.Action0;
4444
import rx.functions.Action1;
4545
import rx.functions.Func1;
46+
import rx.plugins.RxJavaErrorHandler;
47+
import rx.plugins.RxJavaPlugins;
4648

4749
/**
4850
* Base tests for all schedulers including Immediate/Current.
@@ -502,4 +504,54 @@ public void onNext(T args) {
502504

503505
}
504506

507+
@Test
508+
public final void testStackTraceAcrossThreads() throws Throwable {
509+
final AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>();
510+
final CountDownLatch done = new CountDownLatch(1);
511+
System.setProperty("rxjava.captureSchedulerContext", "true");
512+
513+
try {
514+
515+
RxJavaPlugins.getInstance().reset();
516+
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
517+
@Override
518+
public void handleError(Throwable e) {
519+
exceptionRef.set(e);
520+
done.countDown();
521+
}
522+
});
523+
524+
try {
525+
getScheduler().createWorker().schedule(new Action0() {
526+
@Override
527+
public void call() {
528+
throw new RuntimeException();
529+
}
530+
});
531+
} catch (Exception e) {
532+
exceptionRef.set(e);
533+
done.countDown();
534+
}
535+
536+
done.await();
537+
538+
Throwable exception = exceptionRef.get();
539+
Throwable e = exception;
540+
while (e.getCause() != null) {
541+
e = e.getCause();
542+
}
543+
544+
StackTraceElement[] st = e.getStackTrace();
545+
for (StackTraceElement stackTraceElement : st) {
546+
if (stackTraceElement.getMethodName().equals("testStackTraceAcrossThreads")) {
547+
// pass we found this class in the stack trace.
548+
return;
549+
}
550+
}
551+
552+
throw exception;
553+
} finally {
554+
System.setProperty("rxjava.captureSchedulerContext", "false");
555+
}
556+
}
505557
}

0 commit comments

Comments
 (0)