Skip to content

Commit 78b7b59

Browse files
Merge pull request #1766 from loganj/uncaught
Unhandled errors go to UncaughtExceptionHandler
2 parents 04cf882 + 8ffd742 commit 78b7b59

File tree

6 files changed

+166
-4
lines changed

6 files changed

+166
-4
lines changed

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,9 @@ public void run() {
5151
} else {
5252
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
5353
}
54-
ie.printStackTrace();
5554
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
55+
Thread thread = Thread.currentThread();
56+
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
5657
} finally {
5758
unsubscribe();
5859
}

src/main/java/rx/schedulers/Schedulers.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,9 @@ public static Scheduler trampoline() {
7575

7676
/**
7777
* Creates and returns a {@link Scheduler} that creates a new {@link Thread} for each unit of work.
78-
*
78+
* <p>
79+
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
80+
*
7981
* @return a {@link NewThreadScheduler} instance
8082
*/
8183
public static Scheduler newThread() {
@@ -88,7 +90,9 @@ public static Scheduler newThread() {
8890
* This can be used for event-loops, processing callbacks and other computational work.
8991
* <p>
9092
* Do not perform IO-bound work on this scheduler. Use {@link #io()} instead.
91-
*
93+
* <p>
94+
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
95+
*
9296
* @return a {@link Scheduler} meant for computation-bound work
9397
*/
9498
public static Scheduler computation() {
@@ -103,7 +107,9 @@ public static Scheduler computation() {
103107
* This can be used for asynchronously performing blocking IO.
104108
* <p>
105109
* Do not perform computational work on this scheduler. Use {@link #computation()} instead.
106-
*
110+
* <p>
111+
* Unhandled errors will be delivered to the scheduler Thread's {@link java.lang.Thread.UncaughtExceptionHandler}.
112+
*
107113
* @return a {@link Scheduler} meant for IO-bound work
108114
*/
109115
public static Scheduler io() {

src/test/java/rx/schedulers/CachedThreadSchedulerTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,4 +57,13 @@ public void call(String t) {
5757
});
5858
}
5959

60+
@Test
61+
public final void testUnhandledErrorIsDeliveredToThreadHandler() throws InterruptedException {
62+
SchedulerTests.testUnhandledErrorIsDeliveredToThreadHandler(getScheduler());
63+
}
64+
65+
@Test
66+
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
67+
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
68+
}
6069
}

src/test/java/rx/schedulers/ComputationSchedulerTests.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,14 @@ public void call(String t) {
137137
}
138138
});
139139
}
140+
141+
@Test
142+
public final void testUnhandledErrorIsDeliveredToThreadHandler() throws InterruptedException {
143+
SchedulerTests.testUnhandledErrorIsDeliveredToThreadHandler(getScheduler());
144+
}
145+
146+
@Test
147+
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
148+
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
149+
}
140150
}

src/test/java/rx/schedulers/NewThreadSchedulerTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package rx.schedulers;
1818

19+
import org.junit.Test;
1920
import rx.Scheduler;
2021

2122
public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
@@ -24,4 +25,14 @@ public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {
2425
protected Scheduler getScheduler() {
2526
return Schedulers.newThread();
2627
}
28+
29+
@Test
30+
public final void testUnhandledErrorIsDeliveredToThreadHandler() throws InterruptedException {
31+
SchedulerTests.testUnhandledErrorIsDeliveredToThreadHandler(getScheduler());
32+
}
33+
34+
@Test
35+
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
36+
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
37+
}
2738
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package rx.schedulers;
2+
3+
import rx.Observable;
4+
import rx.Observer;
5+
import rx.Scheduler;
6+
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import static org.junit.Assert.assertEquals;
11+
import static org.junit.Assert.fail;
12+
13+
final class SchedulerTests {
14+
private SchedulerTests() {
15+
// No instances.
16+
}
17+
18+
/**
19+
* Verifies that the given Scheduler delivers unhandled errors to its executing thread's
20+
* {@link java.lang.Thread.UncaughtExceptionHandler}.
21+
* <p>
22+
* Schedulers which execute on a separate thread from their calling thread should exhibit this behavior. Schedulers
23+
* which execute on their calling thread may not.
24+
*/
25+
static void testUnhandledErrorIsDeliveredToThreadHandler(Scheduler scheduler) throws InterruptedException {
26+
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
27+
try {
28+
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
29+
Thread.setDefaultUncaughtExceptionHandler(handler);
30+
IllegalStateException error = new IllegalStateException("Should be delivered to handler");
31+
Observable.error(error)
32+
.subscribeOn(scheduler)
33+
.subscribe();
34+
35+
if (!handler.completed.await(3, TimeUnit.SECONDS)) {
36+
fail("timed out");
37+
}
38+
39+
assertEquals("Should have received exactly 1 exception", 1, handler.count);
40+
Throwable cause = handler.caught;
41+
while (cause != null) {
42+
if (error.equals(cause)) break;
43+
if (cause == cause.getCause()) break;
44+
cause = cause.getCause();
45+
}
46+
assertEquals("Our error should have been delivered to the handler", error, cause);
47+
} finally {
48+
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
49+
}
50+
}
51+
52+
/**
53+
* Verifies that the given Scheduler does not deliver handled errors to its executing Thread's
54+
* {@link java.lang.Thread.UncaughtExceptionHandler}.
55+
* <p>
56+
* This is a companion test to {@link #testUnhandledErrorIsDeliveredToThreadHandler}, and is needed only for the
57+
* same Schedulers.
58+
*/
59+
static void testHandledErrorIsNotDeliveredToThreadHandler(Scheduler scheduler) throws InterruptedException {
60+
Thread.UncaughtExceptionHandler originalHandler = Thread.getDefaultUncaughtExceptionHandler();
61+
try {
62+
CapturingUncaughtExceptionHandler handler = new CapturingUncaughtExceptionHandler();
63+
CapturingObserver<Object> observer = new CapturingObserver<Object>();
64+
Thread.setDefaultUncaughtExceptionHandler(handler);
65+
IllegalStateException error = new IllegalStateException("Should be delivered to handler");
66+
Observable.error(error)
67+
.subscribeOn(scheduler)
68+
.subscribe(observer);
69+
70+
if (!observer.completed.await(3, TimeUnit.SECONDS)) {
71+
fail("timed out");
72+
}
73+
74+
assertEquals("Handler should not have received anything", 0, handler.count);
75+
assertEquals("Observer should have received an error", 1, observer.errorCount);
76+
assertEquals("Observer should not have received a next value", 0, observer.nextCount);
77+
78+
Throwable cause = observer.error;
79+
while (cause != null) {
80+
if (error.equals(cause)) break;
81+
if (cause == cause.getCause()) break;
82+
cause = cause.getCause();
83+
}
84+
assertEquals("Our error should have been delivered to the observer", error, cause);
85+
} finally {
86+
Thread.setDefaultUncaughtExceptionHandler(originalHandler);
87+
}
88+
}
89+
90+
private static final class CapturingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
91+
int count = 0;
92+
Throwable caught;
93+
CountDownLatch completed = new CountDownLatch(1);
94+
95+
@Override
96+
public void uncaughtException(Thread t, Throwable e) {
97+
count++;
98+
caught = e;
99+
completed.countDown();
100+
}
101+
}
102+
103+
private static final class CapturingObserver<T> implements Observer<T> {
104+
CountDownLatch completed = new CountDownLatch(1);
105+
int errorCount = 0;
106+
int nextCount = 0;
107+
Throwable error;
108+
109+
@Override
110+
public void onCompleted() {
111+
}
112+
113+
@Override
114+
public void onError(Throwable e) {
115+
errorCount++;
116+
error = e;
117+
completed.countDown();
118+
}
119+
120+
@Override
121+
public void onNext(T t) {
122+
nextCount++;
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)