Skip to content

Commit 00fdfaf

Browse files
committed
Merge pull request #3696 from phajduk/SingleHooks
1.x: Added Single execution hooks
2 parents f7321d2 + 0b8344b commit 00fdfaf

File tree

6 files changed

+313
-16
lines changed

6 files changed

+313
-16
lines changed

src/main/java/rx/Single.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.observers.SerializedSubscriber;
3030
import rx.plugins.RxJavaObservableExecutionHook;
3131
import rx.plugins.RxJavaPlugins;
32+
import rx.plugins.RxJavaSingleExecutionHook;
3233
import rx.schedulers.Schedulers;
3334
import rx.singles.BlockingSingle;
3435
import rx.subscriptions.Subscriptions;
@@ -101,7 +102,7 @@ private Single(final Observable.OnSubscribe<T> f) {
101102
this.onSubscribe = f;
102103
}
103104

104-
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
105+
static RxJavaSingleExecutionHook hook = RxJavaPlugins.getInstance().getSingleExecutionHook();
105106

106107
/**
107108
* Returns a Single that will execute the specified function when a {@link SingleSubscriber} executes it or
@@ -130,7 +131,7 @@ private Single(final Observable.OnSubscribe<T> f) {
130131
* @see <a href="http://reactivex.io/documentation/operators/create.html">ReactiveX operators documentation: Create</a>
131132
*/
132133
public static <T> Single<T> create(OnSubscribe<T> f) {
133-
return new Single<T>(f); // TODO need hook
134+
return new Single<T>(hook.onCreate(f));
134135
}
135136

136137
/**
@@ -1607,14 +1608,12 @@ public final void onNext(T args) {
16071608
* @param subscriber
16081609
* the Subscriber that will handle the emission or notification from the Single
16091610
*/
1610-
public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
1611+
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
16111612
try {
16121613
// new Subscriber so onStart it
16131614
subscriber.onStart();
1614-
// TODO add back the hook
1615-
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1616-
onSubscribe.call(subscriber);
1617-
hook.onSubscribeReturn(subscriber);
1615+
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1616+
return hook.onSubscribeReturn(subscriber);
16181617
} catch (Throwable e) {
16191618
// special handling for certain Throwable/Error/Exception types
16201619
Exceptions.throwIfFatal(e);
@@ -1631,6 +1630,7 @@ public final void unsafeSubscribe(Subscriber<? super T> subscriber) {
16311630
// TODO why aren't we throwing the hook's return value.
16321631
throw r;
16331632
}
1633+
return Subscriptions.unsubscribed();
16341634
}
16351635
}
16361636

@@ -1722,9 +1722,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
17221722
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
17231723
try {
17241724
// allow the hook to intercept and/or decorate
1725-
// TODO add back the hook
1726-
// hook.onSubscribeStart(this, onSubscribe).call(subscriber);
1727-
onSubscribe.call(subscriber);
1725+
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
17281726
return hook.onSubscribeReturn(subscriber);
17291727
} catch (Throwable e) {
17301728
// special handling for certain Throwable/Error/Exception types

src/main/java/rx/plugins/RxJavaPlugins.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public class RxJavaPlugins {
5050

5151
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
5252
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
53+
private final AtomicReference<RxJavaSingleExecutionHook> singleExecutionHook = new AtomicReference<RxJavaSingleExecutionHook>();
5354
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();
5455

5556
/**
@@ -68,6 +69,7 @@ public static RxJavaPlugins getInstance() {
6869
/* package accessible for unit tests */void reset() {
6970
INSTANCE.errorHandler.set(null);
7071
INSTANCE.observableExecutionHook.set(null);
72+
INSTANCE.singleExecutionHook.set(null);
7173
INSTANCE.schedulersHook.set(null);
7274
}
7375

@@ -156,6 +158,48 @@ public void registerObservableExecutionHook(RxJavaObservableExecutionHook impl)
156158
}
157159
}
158160

161+
/**
162+
* Retrieves the instance of {@link RxJavaSingleExecutionHook} to use based on order of precedence as
163+
* defined in {@link RxJavaPlugins} class header.
164+
* <p>
165+
* Override the default by calling {@link #registerSingleExecutionHook(RxJavaSingleExecutionHook)}
166+
* or by setting the property {@code rxjava.plugin.RxJavaSingleExecutionHook.implementation} with the
167+
* full classname to load.
168+
*
169+
* @return {@link RxJavaSingleExecutionHook} implementation to use
170+
*/
171+
public RxJavaSingleExecutionHook getSingleExecutionHook() {
172+
if (singleExecutionHook.get() == null) {
173+
// check for an implementation from System.getProperty first
174+
Object impl = getPluginImplementationViaProperty(RxJavaSingleExecutionHook.class, System.getProperties());
175+
if (impl == null) {
176+
// nothing set via properties so initialize with default
177+
singleExecutionHook.compareAndSet(null, RxJavaSingleExecutionHookDefault.getInstance());
178+
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
179+
} else {
180+
// we received an implementation from the system property so use it
181+
singleExecutionHook.compareAndSet(null, (RxJavaSingleExecutionHook) impl);
182+
}
183+
}
184+
return singleExecutionHook.get();
185+
}
186+
187+
/**
188+
* Register an {@link RxJavaSingleExecutionHook} implementation as a global override of any injected or
189+
* default implementations.
190+
*
191+
* @param impl
192+
* {@link RxJavaSingleExecutionHook} implementation
193+
* @throws IllegalStateException
194+
* if called more than once or after the default was initialized (if usage occurs before trying
195+
* to register)
196+
*/
197+
public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) {
198+
if (!singleExecutionHook.compareAndSet(null, impl)) {
199+
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
200+
}
201+
}
202+
159203
/* test */ static Object getPluginImplementationViaProperty(Class<?> pluginClass, Properties props) {
160204
final String classSimpleName = pluginClass.getSimpleName();
161205
/*
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
import rx.Observable;
19+
import rx.Single;
20+
import rx.Subscriber;
21+
import rx.Subscription;
22+
import rx.functions.Func1;
23+
24+
/**
25+
* Abstract ExecutionHook with invocations at different lifecycle points of {@link Single} execution with a
26+
* default no-op implementation.
27+
* <p>
28+
* See {@link RxJavaPlugins} or the RxJava GitHub Uncyclo for information on configuring plugins:
29+
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
30+
* <p>
31+
* <b>Note on thread-safety and performance:</b>
32+
* <p>
33+
* A single implementation of this class will be used globally so methods on this class will be invoked
34+
* concurrently from multiple threads so all functionality must be thread-safe.
35+
* <p>
36+
* Methods are also invoked synchronously and will add to execution time of the single so all behavior
37+
* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
38+
* worker threads.
39+
*
40+
*/
41+
public abstract class RxJavaSingleExecutionHook {
42+
/**
43+
* Invoked during the construction by {@link Single#create(Single.OnSubscribe)}
44+
* <p>
45+
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
46+
* logging, metrics and other such things and pass-thru the function.
47+
*
48+
* @param f
49+
* original {@link Single.OnSubscribe}<{@code T}> to be executed
50+
* @return {@link Single.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
51+
* returned as a pass-thru
52+
*/
53+
public <T> Single.OnSubscribe<T> onCreate(Single.OnSubscribe<T> f) {
54+
return f;
55+
}
56+
57+
/**
58+
* Invoked before {@link Single#subscribe(Subscriber)} is about to be executed.
59+
* <p>
60+
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
61+
* logging, metrics and other such things and pass-thru the function.
62+
*
63+
* @param onSubscribe
64+
* original {@link Observable.OnSubscribe}<{@code T}> to be executed
65+
* @return {@link Observable.OnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
66+
* returned as a pass-thru
67+
*/
68+
public <T> Observable.OnSubscribe<T> onSubscribeStart(Single<? extends T> singleInstance, final Observable.OnSubscribe<T> onSubscribe) {
69+
// pass-thru by default
70+
return onSubscribe;
71+
}
72+
73+
/**
74+
* Invoked after successful execution of {@link Single#subscribe(Subscriber)} with returned
75+
* {@link Subscription}.
76+
* <p>
77+
* This can be used to decorate or replace the {@link Subscription} instance or just perform extra logging,
78+
* metrics and other such things and pass-thru the subscription.
79+
*
80+
* @param subscription
81+
* original {@link Subscription}
82+
* @return {@link Subscription} subscription that can be modified, decorated, replaced or just returned as a
83+
* pass-thru
84+
*/
85+
public <T> Subscription onSubscribeReturn(Subscription subscription) {
86+
// pass-thru by default
87+
return subscription;
88+
}
89+
90+
/**
91+
* Invoked after failed execution of {@link Single#subscribe(Subscriber)} with thrown Throwable.
92+
* <p>
93+
* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
94+
* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
95+
*
96+
* @param e
97+
* Throwable thrown by {@link Single#subscribe(Subscriber)}
98+
* @return Throwable that can be decorated, replaced or just returned as a pass-thru
99+
*/
100+
public <T> Throwable onSubscribeError(Throwable e) {
101+
// pass-thru by default
102+
return e;
103+
}
104+
105+
/**
106+
* Invoked just as the operator functions is called to bind two operations together into a new
107+
* {@link Single} and the return value is used as the lifted function
108+
* <p>
109+
* This can be used to decorate or replace the {@link Observable.Operator} instance or just perform extra
110+
* logging, metrics and other such things and pass-thru the onSubscribe.
111+
*
112+
* @param lift
113+
* original {@link Observable.Operator}{@code <R, T>}
114+
* @return {@link Observable.Operator}{@code <R, T>} function that can be modified, decorated, replaced or just
115+
* returned as a pass-thru
116+
*/
117+
public <T, R> Observable.Operator<? extends R, ? super T> onLift(final Observable.Operator<? extends R, ? super T> lift) {
118+
return lift;
119+
}
120+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.plugins;
17+
18+
/**
19+
* Default no-op implementation of {@link RxJavaSingleExecutionHook}
20+
*/
21+
class RxJavaSingleExecutionHookDefault extends RxJavaSingleExecutionHook {
22+
23+
private static final RxJavaSingleExecutionHookDefault INSTANCE = new RxJavaSingleExecutionHookDefault();
24+
25+
public static RxJavaSingleExecutionHook getInstance() {
26+
return INSTANCE;
27+
}
28+
}

0 commit comments

Comments
 (0)