Skip to content

1.x: add RxJavaHooks tests, fix small bugs #4142

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 26 additions & 7 deletions src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ public interface CompletableTransformer extends Func1<Completable, Completable>
}

/** Single instance of a complete Completable. */
static final Completable COMPLETE = create(new CompletableOnSubscribe() {
static final Completable COMPLETE = new Completable(new CompletableOnSubscribe() {
@Override
public void call(CompletableSubscriber s) {
s.onSubscribe(Subscriptions.unsubscribed());
s.onCompleted();
}
});
}, true); // hook is handled in complete()

/** Single instance of a never Completable. */
static final Completable NEVER = create(new CompletableOnSubscribe() {
static final Completable NEVER = new Completable(new CompletableOnSubscribe() {
@Override
public void call(CompletableSubscriber s) {
s.onSubscribe(Subscriptions.unsubscribed());
}
});
}, true); // hook is handled in never()

/**
* Returns a Completable which terminates as soon as one of the source Completables
Expand Down Expand Up @@ -311,7 +311,11 @@ public void onSubscribe(Subscription d) {
* @return a Completable instance that completes immediately
*/
public static Completable complete() {
return COMPLETE;
CompletableOnSubscribe cos = RxJavaHooks.onCreate(COMPLETE.onSubscribe);
if (cos == COMPLETE.onSubscribe) {
return COMPLETE;
}
return new Completable(cos, true);
}

/**
Expand Down Expand Up @@ -734,7 +738,11 @@ public static Completable mergeDelayError(Observable<? extends Completable> sour
* @return the singleton instance that never calls onError or onComplete
*/
public static Completable never() {
return NEVER;
CompletableOnSubscribe cos = RxJavaHooks.onCreate(NEVER.onSubscribe);
if (cos == NEVER.onSubscribe) {
return NEVER;
}
return new Completable(cos, true);
}

/**
Expand Down Expand Up @@ -975,7 +983,18 @@ public void call() {
protected Completable(CompletableOnSubscribe onSubscribe) {
this.onSubscribe = RxJavaHooks.onCreate(onSubscribe);
}


/**
* Constructs a Completable instance with the given onSubscribe callback without calling the onCreate
* hook.
* @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe,
* not null (not verified)
* @param useHook if false, RxJavaHooks.onCreate won't be called
*/
private Completable(CompletableOnSubscribe onSubscribe, boolean useHook) {
this.onSubscribe = useHook ? RxJavaHooks.onCreate(onSubscribe) : onSubscribe;
}

/**
* Returns a Completable that emits the a terminated event of either this Completable
* or the other Completable whichever fires first.
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/rx/plugins/RxJavaHooks.java
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ public static void reset() {
return;
}
init();

onComputationScheduler = null;
onIOScheduler = null;
onNewThreadScheduler = null;
}

/**
Expand Down Expand Up @@ -224,7 +228,7 @@ public static void onError(Throwable ex) {
Action1<Throwable> f = onError;
if (f != null) {
try {
f.call(ex);
f.call(ex);
return;
} catch (Throwable pluginException) {
/*
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public static Scheduler computation() {
* @return a {@link Scheduler} meant for IO-bound work
*/
public static Scheduler io() {
return RxJavaHooks.onComputationScheduler(getInstance().ioScheduler);
return RxJavaHooks.onIOScheduler(getInstance().ioScheduler);
}

/**
Expand Down
Loading