Skip to content

single: add toSingle method to Observable #3049

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 24, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,28 @@ public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer
public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
// cover for generics insanity
}



/**
* Returns a Single that emits the single item emitted by the source Observable, if that Observable
* emits only a single item. If the source Observable emits more than one item or no items, notify of an
* {@code IllegalArgumentException} or {@code NoSuchElementException} respectively.
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a Single that emits the single item emitted by the source Observable
* @throws IllegalArgumentException
* if the source observable emits more than one item
* @throws NoSuchElementException
* if the source observable emits no items
*/
@Experimental
public Single<T> toSingle() {
return new Single<T>(OnSubscribeSingle.create(this));
}


/* *********************************************************************************************************
* Operators Below Here
Expand Down
89 changes: 89 additions & 0 deletions src/main/java/rx/internal/operators/OnSubscribeSingle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import rx.Observable;
import rx.Single;
import rx.SingleSubscriber;
import rx.Subscriber;

import java.util.NoSuchElementException;

/**
* Allows conversion of an Observable to a Single ensuring that exactly one item is emitted - no more and no less.
* Also forwards errors as appropriate.
*/
public class OnSubscribeSingle<T> implements Single.OnSubscribe<T> {

private final Observable<T> observable;

public OnSubscribeSingle(Observable<T> observable) {
this.observable = observable;
}

@Override
public void call(final SingleSubscriber<? super T> child) {
Subscriber<T> parent = new Subscriber<T>() {
private boolean emittedTooMany = false;
private boolean itemEmitted = false;
private T emission = null;

@Override
public void onStart() {
// We request 2 here since we need 1 for the single and 1 to check that the observable
// doesn't emit more than one item
request(2);
}

@Override
public void onCompleted() {
if (emittedTooMany) {
// Don't need to do anything here since we already sent an error downstream
} else {
if (itemEmitted) {
child.onSuccess(emission);
} else {
child.onError(new NoSuchElementException("Observable emitted no items"));
}
}
}

@Override
public void onError(Throwable e) {
child.onError(e);
unsubscribe();
}

@Override
public void onNext(T t) {
if (itemEmitted) {
emittedTooMany = true;
child.onError(new IllegalArgumentException("Observable emitted too many elements"));
unsubscribe();
} else {
itemEmitted = true;
emission = t;
}
}
};
child.add(parent);
observable.subscribe(parent);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjchristensen Shouldn't this use unsafeSubscribe?

}

public static <T> OnSubscribeSingle<T> create(Observable<T> observable) {
return new OnSubscribeSingle<T>(observable);
}
}
73 changes: 73 additions & 0 deletions src/test/java/rx/internal/operators/OnSubscribeSingleTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import org.junit.Test;
import rx.Observable;
import rx.Single;
import rx.observers.TestSubscriber;

import java.util.Collections;
import java.util.NoSuchElementException;

public class OnSubscribeSingleTest {

@Test
public void testJustSingleItemObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Observable.just("Hello World!").toSingle();
single.subscribe(subscriber);

subscriber.assertReceivedOnNext(Collections.singletonList("Hello World!"));
}

@Test
public void testErrorObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
IllegalArgumentException error = new IllegalArgumentException("Error");
Single<String> single = Observable.<String>error(error).toSingle();
single.subscribe(subscriber);

subscriber.assertError(error);
}

@Test
public void testJustTwoEmissionsObservableThrowsError() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Observable.just("First", "Second").toSingle();
single.subscribe(subscriber);

subscriber.assertError(IllegalArgumentException.class);
}

@Test
public void testEmptyObservable() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Observable.<String>empty().toSingle();
single.subscribe(subscriber);

subscriber.assertError(NoSuchElementException.class);
}

@Test
public void testRepeatObservableThrowsError() {
TestSubscriber<String> subscriber = TestSubscriber.create();
Single<String> single = Observable.just("First", "Second").repeat().toSingle();
single.subscribe(subscriber);

subscriber.assertError(IllegalArgumentException.class);
}
}