Skip to content

Commit d71314e

Browse files
committed
Merge pull request #3155 from davidmoten/safe-sub-catch
SafeSubscriber - report onCompleted unsubscribe error to RxJavaPlugin
2 parents 189928c + 5fec06f commit d71314e

File tree

6 files changed

+223
-66
lines changed

6 files changed

+223
-66
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.exceptions;
17+
18+
public final class OnCompletedFailedException extends RuntimeException {
19+
20+
private static final long serialVersionUID = 8622579378868820554L;
21+
22+
public OnCompletedFailedException(Throwable throwable) {
23+
super(throwable);
24+
}
25+
26+
public OnCompletedFailedException(String message, Throwable throwable) {
27+
super(message, throwable);
28+
}
29+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.exceptions;
17+
18+
public final class UnsubscribeFailedException extends RuntimeException {
19+
20+
private static final long serialVersionUID = 4594672310593167598L;
21+
22+
public UnsubscribeFailedException(Throwable throwable) {
23+
super(throwable);
24+
}
25+
26+
public UnsubscribeFailedException(String message, Throwable throwable) {
27+
super(message, throwable);
28+
}
29+
30+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.util;
17+
18+
import rx.plugins.RxJavaPlugins;
19+
20+
public final class RxJavaPluginUtils {
21+
22+
public static void handleException(Throwable e) {
23+
try {
24+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
25+
} catch (Throwable pluginException) {
26+
handlePluginException(pluginException);
27+
}
28+
}
29+
30+
private static void handlePluginException(Throwable pluginException) {
31+
/*
32+
* We don't want errors from the plugin to affect normal flow.
33+
* Since the plugin should never throw this is a safety net
34+
* and will complain loudly to System.err so it gets fixed.
35+
*/
36+
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
37+
pluginException.printStackTrace();
38+
}
39+
40+
}

src/main/java/rx/observers/SafeSubscriber.java

Lines changed: 18 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
import rx.Subscriber;
2121
import rx.exceptions.CompositeException;
2222
import rx.exceptions.Exceptions;
23+
import rx.exceptions.OnCompletedFailedException;
2324
import rx.exceptions.OnErrorFailedException;
2425
import rx.exceptions.OnErrorNotImplementedException;
25-
import rx.plugins.RxJavaPlugins;
26+
import rx.exceptions.UnsubscribeFailedException;
27+
import rx.internal.util.RxJavaPluginUtils;
2628

2729
/**
2830
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
@@ -83,11 +85,17 @@ public void onCompleted() {
8385
// we handle here instead of another method so we don't add stacks to the frame
8486
// which can prevent it from being able to handle StackOverflow
8587
Exceptions.throwIfFatal(e);
86-
// handle errors if the onCompleted implementation fails, not just if the Observable fails
87-
_onError(e);
88+
RxJavaPluginUtils.handleException(e);
89+
throw new OnCompletedFailedException(e.getMessage(), e);
8890
} finally {
89-
// auto-unsubscribe
90-
unsubscribe();
91+
try {
92+
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
93+
// and we throw an UnsubscribeFailureException.
94+
unsubscribe();
95+
} catch (Throwable e) {
96+
RxJavaPluginUtils.handleException(e);
97+
throw new UnsubscribeFailedException(e.getMessage(), e);
98+
}
9199
}
92100
}
93101
}
@@ -145,11 +153,7 @@ public void onNext(T args) {
145153
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
146154
*/
147155
protected void _onError(Throwable e) {
148-
try {
149-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
150-
} catch (Throwable pluginException) {
151-
handlePluginException(pluginException);
152-
}
156+
RxJavaPluginUtils.handleException(e);
153157
try {
154158
actual.onError(e);
155159
} catch (Throwable e2) {
@@ -168,11 +172,7 @@ protected void _onError(Throwable e) {
168172
try {
169173
unsubscribe();
170174
} catch (Throwable unsubscribeException) {
171-
try {
172-
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
173-
} catch (Throwable pluginException) {
174-
handlePluginException(pluginException);
175-
}
175+
RxJavaPluginUtils.handleException(unsubscribeException);
176176
throw new RuntimeException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException)));
177177
}
178178
throw (OnErrorNotImplementedException) e2;
@@ -182,19 +182,11 @@ protected void _onError(Throwable e) {
182182
*
183183
* https://github.com/ReactiveX/RxJava/issues/198
184184
*/
185-
try {
186-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
187-
} catch (Throwable pluginException) {
188-
handlePluginException(pluginException);
189-
}
185+
RxJavaPluginUtils.handleException(e2);
190186
try {
191187
unsubscribe();
192188
} catch (Throwable unsubscribeException) {
193-
try {
194-
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
195-
} catch (Throwable pluginException) {
196-
handlePluginException(pluginException);
197-
}
189+
RxJavaPluginUtils.handleException(unsubscribeException);
198190
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
199191
}
200192

@@ -205,25 +197,11 @@ protected void _onError(Throwable e) {
205197
try {
206198
unsubscribe();
207199
} catch (RuntimeException unsubscribeException) {
208-
try {
209-
RxJavaPlugins.getInstance().getErrorHandler().handleError(unsubscribeException);
210-
} catch (Throwable pluginException) {
211-
handlePluginException(pluginException);
212-
}
200+
RxJavaPluginUtils.handleException(unsubscribeException);
213201
throw new OnErrorFailedException(unsubscribeException);
214202
}
215203
}
216204

217-
private void handlePluginException(Throwable pluginException) {
218-
/*
219-
* We don't want errors from the plugin to affect normal flow.
220-
* Since the plugin should never throw this is a safety net
221-
* and will complain loudly to System.err so it gets fixed.
222-
*/
223-
System.err.println("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException.getMessage());
224-
pluginException.printStackTrace();
225-
}
226-
227205
/**
228206
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
229207
*

src/test/java/rx/observers/SafeObserverTest.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import org.junit.Test;
2424

25+
import junit.framework.Assert;
2526
import rx.Subscriber;
2627
import rx.exceptions.*;
2728
import rx.functions.Action0;
@@ -68,19 +69,6 @@ public void onCompletedFailure() {
6869
}
6970
}
7071

71-
@Test
72-
public void onCompletedFailureSafe() {
73-
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
74-
try {
75-
new SafeSubscriber<String>(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted();
76-
assertNotNull(onError.get());
77-
assertTrue(onError.get() instanceof SafeObserverTestException);
78-
assertEquals("onCompletedFail", onError.get().getMessage());
79-
} catch (Exception e) {
80-
fail("expects exception to be passed to onError");
81-
}
82-
}
83-
8472
@Test
8573
public void onErrorFailure() {
8674
try {
@@ -184,8 +172,8 @@ public void call() {
184172
e.printStackTrace();
185173

186174
assertTrue(o.isUnsubscribed());
187-
188-
assertTrue(e instanceof SafeObserverTestException);
175+
assertTrue(e instanceof UnsubscribeFailedException);
176+
assertTrue(e.getCause() instanceof SafeObserverTestException);
189177
assertEquals("failure from unsubscribe", e.getMessage());
190178
// expected since onError fails so SafeObserver can't help
191179
}
@@ -475,9 +463,12 @@ public void onCompleted() {
475463
}
476464
});
477465

478-
s.onCompleted();
479-
480-
assertTrue("Error not received", error.get() instanceof TestException);
466+
try {
467+
s.onCompleted();
468+
Assert.fail();
469+
} catch (OnCompletedFailedException e) {
470+
assertNull(error.get());
471+
}
481472
}
482473

483474
@Test

0 commit comments

Comments
 (0)