Skip to content

1.x: add Completable.safeSubscribe option + RxJavaPlugins hook support #3942

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 120 additions & 60 deletions src/main/java/rx/Completable.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ public void onSubscribe(Subscription d) {
serial.add(main);
child.add(serial);

other.subscribe(so);
other.unsafeSubscribe(so);

return main;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void next() {
return;
}

c.subscribe(inner);
c.unsafeSubscribe(inner);
}

final class ConcatInnerSubscriber implements CompletableSubscriber {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void next() {
return;
}

a[idx].subscribe(this);
a[idx].unsafeSubscribe(this);
} while (decrementAndGet() != 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void next() {
return;
}

c.subscribe(this);
c.unsafeSubscribe(this);
} while (decrementAndGet() != 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void onNext(Completable t) {

wip.getAndIncrement();

t.subscribe(new CompletableSubscriber() {
t.unsafeSubscribe(new CompletableSubscriber() {
Subscription d;
boolean innerDone;
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void call(final CompletableSubscriber s) {
}
}

c.subscribe(new CompletableSubscriber() {
c.unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onSubscribe(Subscription d) {
set.add(d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void call(final CompletableSubscriber s) {
continue;
}

c.subscribe(new CompletableSubscriber() {
c.unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onSubscribe(Subscription d) {
set.add(d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void call(final CompletableSubscriber s) {

wip.getAndIncrement();

c.subscribe(new CompletableSubscriber() {
c.unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onSubscribe(Subscription d) {
set.add(d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void call(final CompletableSubscriber s) {

wip.getAndIncrement();

c.subscribe(new CompletableSubscriber() {
c.unsafeSubscribe(new CompletableSubscriber() {
@Override
public void onSubscribe(Subscription d) {
set.add(d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void call() {
if (other == null) {
s.onError(new TimeoutException());
} else {
other.subscribe(new CompletableSubscriber() {
other.unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onSubscribe(Subscription d) {
Expand All @@ -85,7 +85,7 @@ public void onCompleted() {
}
}, timeout, unit);

source.subscribe(new CompletableSubscriber() {
source.unsafeSubscribe(new CompletableSubscriber() {

@Override
public void onSubscribe(Subscription d) {
Expand Down
94 changes: 94 additions & 0 deletions src/main/java/rx/observers/SafeCompletableSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observers;

import rx.Completable.CompletableSubscriber;
import rx.Subscription;
import rx.annotations.Experimental;
import rx.exceptions.*;
import rx.internal.util.RxJavaPluginUtils;

/**
* Wraps another CompletableSubscriber and handles exceptions thrown
* from onError and onCompleted.
*
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public final class SafeCompletableSubscriber implements CompletableSubscriber, Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental

final CompletableSubscriber actual;

Subscription s;

boolean done;

public SafeCompletableSubscriber(CompletableSubscriber actual) {
this.actual = actual;
}

@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
try {
actual.onCompleted();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

throw new OnCompletedFailedException(ex);
}
}

@Override
public void onError(Throwable e) {
RxJavaPluginUtils.handleException(e);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be done if !done, as in SafeSubscriber.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done or not, the exception has to be reported to the handler. This also makes your test pass.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why SafeSubscriber doesn't do this? Either this class or the other one are not behaving right.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you demonstrate that with an unit test?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to make a unit test for the case that onError gets called twice in a SafeSubcriber and SafeCompletableSubscriber to see if the error handler gets called twice? I don't know if it's worth it. What it happens is that it just doesn't look good to me when behavior is different in Completable from Single and Observable.

if (done) {
return;
}
done = true;
try {
actual.onError(e);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);

throw new OnErrorFailedException(new CompositeException(e, ex));
}
}

@Override
public void onSubscribe(Subscription d) {
this.s = d;
try {
actual.onSubscribe(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.unsubscribe();
onError(ex);
}
}

@Override
public void unsubscribe() {
s.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return done || s.isUnsubscribed();
}
}
104 changes: 104 additions & 0 deletions src/main/java/rx/plugins/RxJavaCompletableExecutionHook.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.plugins;

import rx.*;
import rx.annotations.Experimental;
import rx.functions.Func1;

/**
* Abstract ExecutionHook with invocations at different lifecycle points of {@link Completable} execution with a
* default no-op implementation.
* <p>
* See {@link RxJavaPlugins} or the RxJava GitHub Uncyclo for information on configuring plugins:
* <a href="https://github.com/ReactiveX/RxJava/wiki/Plugins">https://github.com/ReactiveX/RxJava/wiki/Plugins</a>.
* <p>
* <b>Note on thread-safety and performance:</b>
* <p>
* A single implementation of this class will be used globally so methods on this class will be invoked
* concurrently from multiple threads so all functionality must be thread-safe.
* <p>
* Methods are also invoked synchronously and will add to execution time of the completable so all behavior
* should be fast. If anything time-consuming is to be done it should be spawned asynchronously onto separate
* worker threads.
*
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public abstract class RxJavaCompletableExecutionHook {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental

/**
* Invoked during the construction by {@link Completable#create(Completable.CompletableOnSubscribe)}
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass through the function.
*
* @param f
* original {@link rx.Completable.CompletableOnSubscribe}<{@code T}> to be executed
* @return {@link rx.Completable.CompletableOnSubscribe} function that can be modified, decorated, replaced or just
* returned as a pass through
*/
public Completable.CompletableOnSubscribe onCreate(Completable.CompletableOnSubscribe f) {
return f;
}

/**
* Invoked before {@link Completable#subscribe(Subscriber)} is about to be executed.
* <p>
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
* logging, metrics and other such things and pass through the function.
*
* @param completableInstance the target completable instance
* @param onSubscribe
* original {@link rx.Completable.CompletableOnSubscribe}<{@code T}> to be executed
* @return {@link rx.Completable.CompletableOnSubscribe}<{@code T}> function that can be modified, decorated, replaced or just
* returned as a pass through
*/
public Completable.CompletableOnSubscribe onSubscribeStart(Completable completableInstance, final Completable.CompletableOnSubscribe onSubscribe) {
// pass through by default
return onSubscribe;
}

/**
* Invoked after failed execution of {@link Completable#subscribe(Subscriber)} with thrown Throwable.
* <p>
* This is <em>not</em> errors emitted via {@link Subscriber#onError(Throwable)} but exceptions thrown when
* attempting to subscribe to a {@link Func1}<{@link Subscriber}{@code <T>}, {@link Subscription}>.
*
* @param e
* Throwable thrown by {@link Completable#subscribe(Subscriber)}
* @return Throwable that can be decorated, replaced or just returned as a pass through
*/
public Throwable onSubscribeError(Throwable e) {
// pass through by default
return e;
}

/**
* Invoked just as the operator functions is called to bind two operations together into a new
* {@link Completable} and the return value is used as the lifted function
* <p>
* This can be used to decorate or replace the {@link rx.Completable.CompletableOperator} instance or just perform extra
* logging, metrics and other such things and pass through the onSubscribe.
*
* @param lift
* original {@link rx.Completable.CompletableOperator}{@code <R, T>}
* @return {@link rx.Completable.CompletableOperator}{@code <R, T>} function that can be modified, decorated, replaced or just
* returned as a pass through
*/
public Completable.CompletableOperator onLift(final Completable.CompletableOperator lift) {
return lift;
}
}
47 changes: 47 additions & 0 deletions src/main/java/rx/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class RxJavaPlugins {
private final AtomicReference<RxJavaErrorHandler> errorHandler = new AtomicReference<RxJavaErrorHandler>();
private final AtomicReference<RxJavaObservableExecutionHook> observableExecutionHook = new AtomicReference<RxJavaObservableExecutionHook>();
private final AtomicReference<RxJavaSingleExecutionHook> singleExecutionHook = new AtomicReference<RxJavaSingleExecutionHook>();
private final AtomicReference<RxJavaCompletableExecutionHook> completableExecutionHook = new AtomicReference<RxJavaCompletableExecutionHook>();
private final AtomicReference<RxJavaSchedulersHook> schedulersHook = new AtomicReference<RxJavaSchedulersHook>();

/**
Expand Down Expand Up @@ -211,6 +212,52 @@ public void registerSingleExecutionHook(RxJavaSingleExecutionHook impl) {
}
}

/**
* Retrieves the instance of {@link RxJavaCompletableExecutionHook} to use based on order of precedence as
* defined in {@link RxJavaPlugins} class header.
* <p>
* Override the default by calling {@link #registerCompletableExecutionHook(RxJavaCompletableExecutionHook)}
* or by setting the property {@code rxjava.plugin.RxJavaCompletableExecutionHook.implementation} with the
* full classname to load.
*
* @return {@link RxJavaCompletableExecutionHook} implementation to use
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public RxJavaCompletableExecutionHook getCompletableExecutionHook() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental

if (completableExecutionHook.get() == null) {
// check for an implementation from System.getProperty first
Object impl = getPluginImplementationViaProperty(RxJavaCompletableExecutionHook.class, System.getProperties());
if (impl == null) {
// nothing set via properties so initialize with default
completableExecutionHook.compareAndSet(null, new RxJavaCompletableExecutionHook() { });
// we don't return from here but call get() again in case of thread-race so the winner will always get returned
} else {
// we received an implementation from the system property so use it
completableExecutionHook.compareAndSet(null, (RxJavaCompletableExecutionHook) impl);
}
}
return completableExecutionHook.get();
}

/**
* Register an {@link RxJavaCompletableExecutionHook} implementation as a global override of any injected or
* default implementations.
*
* @param impl
* {@link RxJavaCompletableExecutionHook} implementation
* @throws IllegalStateException
* if called more than once or after the default was initialized (if usage occurs before trying
* to register)
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public void registerCompletableExecutionHook(RxJavaCompletableExecutionHook impl) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Experimental

if (!completableExecutionHook.compareAndSet(null, impl)) {
throw new IllegalStateException("Another strategy was already registered: " + singleExecutionHook.get());
}
}

/* test */ static Object getPluginImplementationViaProperty(Class<?> pluginClass, Properties propsIn) {
// Make a defensive clone because traversal may fail with ConcurrentModificationException
// if the properties get changed by something outside RxJava.
Expand Down
Loading