20
20
import rx .Subscriber ;
21
21
import rx .exceptions .CompositeException ;
22
22
import rx .exceptions .Exceptions ;
23
+ import rx .exceptions .OnCompletedFailedException ;
23
24
import rx .exceptions .OnErrorFailedException ;
24
25
import rx .exceptions .OnErrorNotImplementedException ;
25
- import rx .plugins .RxJavaPlugins ;
26
+ import rx .exceptions .UnsubscribeFailedException ;
27
+ import rx .internal .util .RxJavaPluginUtils ;
26
28
27
29
/**
28
30
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
@@ -83,11 +85,17 @@ public void onCompleted() {
83
85
// we handle here instead of another method so we don't add stacks to the frame
84
86
// which can prevent it from being able to handle StackOverflow
85
87
Exceptions .throwIfFatal (e );
86
- // handle errors if the onCompleted implementation fails, not just if the Observable fails
87
- _onError ( e );
88
+ RxJavaPluginUtils . handleException ( e );
89
+ throw new OnCompletedFailedException ( e . getMessage (), e );
88
90
} finally {
89
- // auto-unsubscribe
90
- unsubscribe ();
91
+ try {
92
+ // Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
93
+ // and we throw an UnsubscribeFailureException.
94
+ unsubscribe ();
95
+ } catch (Throwable e ) {
96
+ RxJavaPluginUtils .handleException (e );
97
+ throw new UnsubscribeFailedException (e .getMessage (), e );
98
+ }
91
99
}
92
100
}
93
101
}
@@ -145,11 +153,7 @@ public void onNext(T args) {
145
153
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
146
154
*/
147
155
protected void _onError (Throwable e ) {
148
- try {
149
- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (e );
150
- } catch (Throwable pluginException ) {
151
- handlePluginException (pluginException );
152
- }
156
+ RxJavaPluginUtils .handleException (e );
153
157
try {
154
158
actual .onError (e );
155
159
} catch (Throwable e2 ) {
@@ -168,11 +172,7 @@ protected void _onError(Throwable e) {
168
172
try {
169
173
unsubscribe ();
170
174
} catch (Throwable unsubscribeException ) {
171
- try {
172
- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (unsubscribeException );
173
- } catch (Throwable pluginException ) {
174
- handlePluginException (pluginException );
175
- }
175
+ RxJavaPluginUtils .handleException (unsubscribeException );
176
176
throw new RuntimeException ("Observer.onError not implemented and error while unsubscribing." , new CompositeException (Arrays .asList (e , unsubscribeException )));
177
177
}
178
178
throw (OnErrorNotImplementedException ) e2 ;
@@ -182,19 +182,11 @@ protected void _onError(Throwable e) {
182
182
*
183
183
* https://github.com/ReactiveX/RxJava/issues/198
184
184
*/
185
- try {
186
- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (e2 );
187
- } catch (Throwable pluginException ) {
188
- handlePluginException (pluginException );
189
- }
185
+ RxJavaPluginUtils .handleException (e2 );
190
186
try {
191
187
unsubscribe ();
192
188
} catch (Throwable unsubscribeException ) {
193
- try {
194
- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (unsubscribeException );
195
- } catch (Throwable pluginException ) {
196
- handlePluginException (pluginException );
197
- }
189
+ RxJavaPluginUtils .handleException (unsubscribeException );
198
190
throw new OnErrorFailedException ("Error occurred when trying to propagate error to Observer.onError and during unsubscription." , new CompositeException (Arrays .asList (e , e2 , unsubscribeException )));
199
191
}
200
192
@@ -205,25 +197,11 @@ protected void _onError(Throwable e) {
205
197
try {
206
198
unsubscribe ();
207
199
} catch (RuntimeException unsubscribeException ) {
208
- try {
209
- RxJavaPlugins .getInstance ().getErrorHandler ().handleError (unsubscribeException );
210
- } catch (Throwable pluginException ) {
211
- handlePluginException (pluginException );
212
- }
200
+ RxJavaPluginUtils .handleException (unsubscribeException );
213
201
throw new OnErrorFailedException (unsubscribeException );
214
202
}
215
203
}
216
204
217
- private void handlePluginException (Throwable pluginException ) {
218
- /*
219
- * We don't want errors from the plugin to affect normal flow.
220
- * Since the plugin should never throw this is a safety net
221
- * and will complain loudly to System.err so it gets fixed.
222
- */
223
- System .err .println ("RxJavaErrorHandler threw an Exception. It shouldn't. => " + pluginException .getMessage ());
224
- pluginException .printStackTrace ();
225
- }
226
-
227
205
/**
228
206
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
229
207
*
0 commit comments