Skip to content

1.x: Add extend() for Single and Completable. #4419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,22 @@ public void call(CompletableSubscriber s) {
s.onSubscribe(Subscriptions.unsubscribed());
}
}, false); // hook is handled in never()


/**
* Passes all emitted values from this Completable to the provided conversion function to be collected and
* returned as a single value. Note that it is legal for a conversion function to return a Completable
* (enabling chaining).
*
* @param <R> the output type of the conversion function
* @param conversion a function that converts from the source {@code Completable} to an {@code R}
* @return an instance of R created by the provided conversion function
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public <R> R extend(Func1<? super CompletableOnSubscribe, ? extends R> conversion) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with this is that CompletableOnSubscribe is practucally useless; you'll have to call Completable.create() with it to get meaningful opererations. The whole method should be the same as to in 2.x. The Observable.extend was pushed hard back then disergarding my reservations.

Of course if you can give a compelling case why reduce the options for the conversion, I'm open minded.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, and would much prefer to()'s semantics.

Since extend() is still @Experimental we have that option...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 sounds much better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closing and will make a new PR, since it might be a day or two.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return conversion.call(new CompletableOnSubscribeExtend(this));
}

/**
* Returns a Completable which terminates as soon as one of the source Completables
* terminates (normally or with an error) and cancels all other Completables.
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/rx/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,21 @@ public interface OnSubscribe<T> extends Action1<SingleSubscriber<? super T>> {
// cover for generics insanity
}

/**
* Passes all emitted values from this Single to the provided conversion function to be collected and
* returned as a single value. Note that it is legal for a conversion function to return a Single
* (enabling chaining).
*
* @param <R> the output type of the conversion function
* @param conversion a function that converts from the source {@code Single<T>} to an {@code R}
* @return an instance of R created by the provided conversion function
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public <R> R extend(Func1<? super OnSubscribe<T>, ? extends R> conversion) {
return conversion.call(new SingleOnSubscribeExtend<T>(this));
}

/**
* Lifts a function to the current Single and returns a new Single that when subscribed to will pass the
* values of the current Single through the Operator function.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import rx.Completable;

/**
* Transforms a CompletableOnSubscribe.call() into an Completable.subscribe() call.
*/
public final class CompletableOnSubscribeExtend implements Completable.CompletableOnSubscribe {
final Completable parent;

public CompletableOnSubscribeExtend(Completable parent) {
this.parent = parent;
}

@Override
public void call(Completable.CompletableSubscriber subscriber) {
parent.subscribe(subscriber);
}
}
35 changes: 35 additions & 0 deletions src/main/java/rx/internal/operators/SingleOnSubscribeExtend.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import rx.*;

/**
* Transforms a Single.OnSubscribe.call() into an Single.subscribe() call.
* @param <T> the value type
*/
public final class SingleOnSubscribeExtend<T> implements Single.OnSubscribe<T> {
final Single<T> parent;

public SingleOnSubscribeExtend(Single<T> parent) {
this.parent = parent;
}

@Override
public void call(SingleSubscriber<? super T> subscriber) {
subscriber.add(parent.subscribe(subscriber));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.*;

import org.junit.Test;

import rx.*;
import rx.Completable.*;
import rx.functions.Func1;

import static org.junit.Assert.*;

public final class CompletableOnSubscribeExtendTest {
@Test
public void convertToBoolean() {
boolean completeWorked = Completable.complete().extend(toBoolean());
assertTrue(completeWorked);

RuntimeException e = new RuntimeException();
boolean errorWorked = Completable.error(e).extend(toBoolean());
assertFalse(errorWorked);
}

private Func1<CompletableOnSubscribe, Boolean> toBoolean() {
return new Func1<CompletableOnSubscribe, Boolean>() {
@Override
public Boolean call(CompletableOnSubscribe onSubscribe) {
final AtomicBoolean worked = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
onSubscribe.call(new CompletableSubscriber() {
@Override
public void onCompleted() {
worked.set(true);
latch.countDown();
}

@Override
public void onError(Throwable e) {
worked.set(false);
latch.countDown();
}

@Override
public void onSubscribe(Subscription d) {
}
});
try {
latch.await();
} catch (InterruptedException e) {
return false;
}
return worked.get();
}
};
}
}
112 changes: 112 additions & 0 deletions src/test/java/rx/internal/operators/SingleOnSubscribeExtendTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.*;
import rx.Single.OnSubscribe;
import rx.functions.Func1;

import static org.junit.Assert.*;

public final class SingleOnSubscribeExtendTest {
@Test
public void convertToEither() {
Either<String> maybeHi = Single.just("Hi").extend(toEither());
assertTrue(maybeHi.hasValue());
assertFalse(maybeHi.hasThrowable());
assertEquals("Hi", maybeHi.value());

RuntimeException e = new RuntimeException();
Either<String> maybeThrowable = Single.<String>error(e).extend(toEither());
assertFalse(maybeThrowable.hasValue());
assertTrue(maybeThrowable.hasThrowable());
assertSame(e, maybeThrowable.throwable());
}

private Func1<OnSubscribe<String>, Either<String>> toEither() {
return new Func1<OnSubscribe<String>, Either<String>>() {
@Override
public Either<String> call(OnSubscribe<String> onSubscribe) {
final AtomicReference<Either<String>> eitherRef = new AtomicReference<Either<String>>();
final CountDownLatch latch = new CountDownLatch(1);
onSubscribe.call(new SingleSubscriber<String>() {
@Override
public void onSuccess(String value) {
eitherRef.set(Either.ofValue(value));
latch.countDown();
}

@Override
public void onError(Throwable error) {
eitherRef.set(Either.<String>ofThrowable(error));
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
return Either.ofThrowable(e);
}
return eitherRef.get();
}
};
}

static final class Either<T> {
static <T> Either<T> ofValue(T value) {
return new Either<T>(value, null);
}

static <T> Either<T> ofThrowable(Throwable throwable) {
return new Either<T>(null, throwable);
}

private final T value;
private final Throwable throwable;

private Either(T value, Throwable throwable) {
this.value = value;
this.throwable = throwable;
}

public boolean hasValue() {
return throwable == null; // Allows null as value.
}

public T value() {
if (throwable != null) {
throw new NullPointerException("No value.");
}
return value;
}

public boolean hasThrowable() {
return throwable != null;
}

public Throwable throwable() {
if (throwable == null) {
throw new NullPointerException("No throwable.");
}
return throwable;
}
}
}