Skip to content

Commit 4b61060

Browse files
akarnokdzsxwing
authored andcommitted
1.x: add Completable.safeSubscribe option + RxJavaPlugins hook support (#3942)
Add option to safely subscribe a CompletableSubscriber / regular Subscriber and handle onXXX failures.
1 parent 44947d9 commit 4b61060

18 files changed

+685
-109
lines changed

src/main/java/rx/Completable.java

Lines changed: 120 additions & 60 deletions
Large diffs are not rendered by default.

src/main/java/rx/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2024,7 +2024,7 @@ public void onSubscribe(Subscription d) {
20242024
serial.add(main);
20252025
child.add(serial);
20262026

2027-
other.subscribe(so);
2027+
other.unsafeSubscribe(so);
20282028

20292029
return main;
20302030
}

src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ void next() {
130130
return;
131131
}
132132

133-
c.subscribe(inner);
133+
c.unsafeSubscribe(inner);
134134
}
135135

136136
final class ConcatInnerSubscriber implements CompletableSubscriber {

src/main/java/rx/internal/operators/CompletableOnSubscribeConcatArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ void next() {
8989
return;
9090
}
9191

92-
a[idx].subscribe(this);
92+
a[idx].unsafeSubscribe(this);
9393
} while (decrementAndGet() != 0);
9494
}
9595
}

src/main/java/rx/internal/operators/CompletableOnSubscribeConcatIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ void next() {
128128
return;
129129
}
130130

131-
c.subscribe(this);
131+
c.unsafeSubscribe(this);
132132
} while (decrementAndGet() != 0);
133133
}
134134
}

src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void onNext(Completable t) {
101101

102102
wip.getAndIncrement();
103103

104-
t.subscribe(new CompletableSubscriber() {
104+
t.unsafeSubscribe(new CompletableSubscriber() {
105105
Subscription d;
106106
boolean innerDone;
107107
@Override

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public void call(final CompletableSubscriber s) {
5454
}
5555
}
5656

57-
c.subscribe(new CompletableSubscriber() {
57+
c.unsafeSubscribe(new CompletableSubscriber() {
5858
@Override
5959
public void onSubscribe(Subscription d) {
6060
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public void call(final CompletableSubscriber s) {
5151
continue;
5252
}
5353

54-
c.subscribe(new CompletableSubscriber() {
54+
c.unsafeSubscribe(new CompletableSubscriber() {
5555
@Override
5656
public void onSubscribe(Subscription d) {
5757
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeDelayErrorIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void call(final CompletableSubscriber s) {
117117

118118
wip.getAndIncrement();
119119

120-
c.subscribe(new CompletableSubscriber() {
120+
c.unsafeSubscribe(new CompletableSubscriber() {
121121
@Override
122122
public void onSubscribe(Subscription d) {
123123
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeMergeIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public void call(final CompletableSubscriber s) {
110110

111111
wip.getAndIncrement();
112112

113-
c.subscribe(new CompletableSubscriber() {
113+
c.unsafeSubscribe(new CompletableSubscriber() {
114114
@Override
115115
public void onSubscribe(Subscription d) {
116116
set.add(d);

src/main/java/rx/internal/operators/CompletableOnSubscribeTimeout.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void call() {
6060
if (other == null) {
6161
s.onError(new TimeoutException());
6262
} else {
63-
other.subscribe(new CompletableSubscriber() {
63+
other.unsafeSubscribe(new CompletableSubscriber() {
6464

6565
@Override
6666
public void onSubscribe(Subscription d) {
@@ -85,7 +85,7 @@ public void onCompleted() {
8585
}
8686
}, timeout, unit);
8787

88-
source.subscribe(new CompletableSubscriber() {
88+
source.unsafeSubscribe(new CompletableSubscriber() {
8989

9090
@Override
9191
public void onSubscribe(Subscription d) {
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.observers;
17+
18+
import rx.Completable.CompletableSubscriber;
19+
import rx.Subscription;
20+
import rx.annotations.Experimental;
21+
import rx.exceptions.*;
22+
import rx.internal.util.RxJavaPluginUtils;
23+
24+
/**
25+
* Wraps another CompletableSubscriber and handles exceptions thrown
26+
* from onError and onCompleted.
27+
*
28+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
29+
*/
30+
@Experimental
31+
public final class SafeCompletableSubscriber implements CompletableSubscriber, Subscription {
32+
final CompletableSubscriber actual;
33+
34+
Subscription s;
35+
36+
boolean done;
37+
38+
public SafeCompletableSubscriber(CompletableSubscriber actual) {
39+
this.actual = actual;
40+
}
41+
42+
@Override
43+
public void onCompleted() {
44+
if (done) {
45+
return;
46+
}
47+
done = true;
48+
try {
49+
actual.onCompleted();
50+
} catch (Throwable ex) {
51+
Exceptions.throwIfFatal(ex);
52+
53+
throw new OnCompletedFailedException(ex);
54+
}
55+
}
56+
57+
@Override
58+
public void onError(Throwable e) {
59+
RxJavaPluginUtils.handleException(e);
60+
if (done) {
61+
return;
62+
}
63+
done = true;
64+
try {
65+
actual.onError(e);
66+
} catch (Throwable ex) {
67+
Exceptions.throwIfFatal(ex);
68+
69+
throw new OnErrorFailedException(new CompositeException(e, ex));
70+
}
71+
}
72+
73+
@Override
74+
public void onSubscribe(Subscription d) {
75+
this.s = d;
76+
try {
77+
actual.onSubscribe(this);
78+
} catch (Throwable ex) {
79+
Exceptions.throwIfFatal(ex);
80+
d.unsubscribe();
81+
onError(ex);
82+
}
83+
}
84+
85+
@Override
86+
public void unsubscribe() {
87+
s.unsubscribe();
88+
}
89+
90+
@Override
91+
public boolean isUnsubscribed() {
92+
return done || s.isUnsubscribed();
93+
}
94+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
import rx.*;
19+
import rx.annotations.Experimental;
20+
import rx.functions.Func1;
21+
22+
/**
23+
* Abstract ExecutionHook with invocations at different lifecycle points of {@link Completable} execution with a
24+
* default no-op implementation.
25+
* <p>
26+
* See {@link RxJavaPlugins} or the RxJava GitHub Uncyclo for information on configuring plugins:
27+
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
28+
* <p>
29+
* <b>Note on thread-safety and performance:</b>
30+
* <p>
31+
* A single implementation of this class will be used globally so methods on this class will be invoked
32+
* concurrently from multiple threads so all functionality must be thread-safe.
33+
* <p>
34+
* Methods are also invoked synchronously and will add to execution time of the completable so all behavior
35+
* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
36+
* worker threads.
37+
*
38+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
39+
*/
40+
@Experimental
41+
public abstract class RxJavaCompletableExecutionHook {
42+
/**
43+
* Invoked during the construction by {@link Completable#create(Completable.CompletableOnSubscribe)}
44+
* <p>
45+
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
46+
* logging, metrics and other such things and pass through the function.
47+
*
48+
* @param f
49+
* original {@link rx.Completable.CompletableOnSubscribe}<{@code T}> to be executed
50+
* @return {@link rx.Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just
51+
* returned as a pass through
52+
*/
53+
public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) {
54+
return f;
55+
}
56+
57+
/**
58+
* Invoked before {@link Completable#subscribe(Subscriber)} is about to be executed.
59+
* <p>
60+
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
61+
* logging, metrics and other such things and pass through the function.
62+
*
63+
* @param completableInstance the target completable instance
64+
* @param onSubscribe
65+
* original {@link rx.Completable.CompletableOnSubscribe}<{@code T}> to be executed
66+
* @return {@link rx.Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
67+
* returned as a pass through
68+
*/
69+
public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) {
70+
// pass through by default
71+
return onSubscribe;
72+
}
73+
74+
/**
75+
* Invoked after failed execution of {@link Completable#subscribe(Subscriber)} with thrown Throwable.
76+
* <p>
77+
* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
78+
* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
79+
*
80+
* @param e
81+
* Throwable thrown by {@link Completable#subscribe(Subscriber)}
82+
* @return Throwable that can be decorated, replaced or just returned as a pass through
83+
*/
84+
public Throwable onSubscribeError(Throwable e) {
85+
// pass through by default
86+
return e;
87+
}
88+
89+
/**
90+
* Invoked just as the operator functions is called to bind two operations together into a new
91+
* {@link Completable} and the return value is used as the lifted function
92+
* <p>
93+
* This can be used to decorate or replace the {@link rx.Completable.CompletableOperator} instance or just perform extra
94+
* logging, metrics and other such things and pass through the onSubscribe.
95+
*
96+
* @param lift
97+
* original {@link rx.Completable.CompletableOperator}{@code <R, T>}
98+
* @return {@link rx.Completable.CompletableOperator}{@code <R, T>} function that can be modified, decorated, replaced or just
99+
* returned as a pass through
100+
*/
101+
public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) {
102+
return lift;
103+
}
104+
}

src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class RxJavaPlugins {
5252
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
5353
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
5454
private final AtomicReference<RxJavaSingleExecutionHook> singleExecutionHook = new AtomicReference<RxJavaSingleExecutionHook>();
55+
private final AtomicReference<RxJavaCompletableExecutionHook> completableExecutionHook = new AtomicReference<RxJavaCompletableExecutionHook>();
5556
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();
5657

5758
/**
@@ -211,6 +212,52 @@ public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) {
211212
}
212213
}
213214

215+
/**
216+
* Retrieves the instance of {@link RxJavaCompletableExecutionHook} to use based on order of precedence as
217+
* defined in {@link RxJavaPlugins} class header.
218+
* <p>
219+
* Override the default by calling {@link #registerCompletableExecutionHook(RxJavaCompletableExecutionHook)}
220+
* or by setting the property {@code rxjava.plugin.RxJavaCompletableExecutionHook.implementation} with the
221+
* full classname to load.
222+
*
223+
* @return {@link RxJavaCompletableExecutionHook} implementation to use
224+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
225+
*/
226+
@Experimental
227+
public RxJavaCompletableExecutionHook getCompletableExecutionHook() {
228+
if (completableExecutionHook.get() == null) {
229+
// check for an implementation from System.getProperty first
230+
Object impl = getPluginImplementationViaProperty(RxJavaCompletableExecutionHook.class, System.getProperties());
231+
if (impl == null) {
232+
// nothing set via properties so initialize with default
233+
completableExecutionHook.compareAndSet(null, new RxJavaCompletableExecutionHook() { });
234+
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
235+
} else {
236+
// we received an implementation from the system property so use it
237+
completableExecutionHook.compareAndSet(null, (RxJavaCompletableExecutionHook) impl);
238+
}
239+
}
240+
return completableExecutionHook.get();
241+
}
242+
243+
/**
244+
* Register an {@link RxJavaCompletableExecutionHook} implementation as a global override of any injected or
245+
* default implementations.
246+
*
247+
* @param impl
248+
* {@link RxJavaCompletableExecutionHook} implementation
249+
* @throws IllegalStateException
250+
* if called more than once or after the default was initialized (if usage occurs before trying
251+
* to register)
252+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
253+
*/
254+
@Experimental
255+
public void registerCompletableExecutionHook(RxJavaCompletableExecutionHook impl) {
256+
if (!completableExecutionHook.compareAndSet(null, impl)) {
257+
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
258+
}
259+
}
260+
214261
/* test */ static Object getPluginImplementationViaProperty(Class<?> pluginClass, Properties propsIn) {
215262
// Make a defensive clone because traversal may fail with ConcurrentModificationException
216263
// if the properties get changed by something outside RxJava.

0 commit comments

Comments
 (0)