Skip to content

2.x: make any() and all() return Single, patch up tests #4573

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;

/**
* Represents a deferred computation without any value but only indication for completion or exception.
Expand Down Expand Up @@ -1804,41 +1804,41 @@ public final Completable unsubscribeOn(final Scheduler scheduler) {
// -------------------------------------------------------------------------

/**
* Creates a TestSubscriber and subscribes
* Creates a TestObserver and subscribes
* it to this Completable.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestSubscriber instance
* @return the new TestObserver instance
* @since 2.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final TestSubscriber<Void> test() {
TestSubscriber<Void> ts = new TestSubscriber<Void>();
subscribe(new SubscriberCompletableObserver<Void>(ts));
public final TestObserver<Void> test() {
TestObserver<Void> ts = new TestObserver<Void>();
subscribe(ts);
return ts;
}

/**
* Creates a TestSubscriber optionally in cancelled state, then subscribes it to this Completable.
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
* Creates a TestObserver optionally in cancelled state, then subscribes it to this Completable.
* @param cancelled if true, the TestObserver will be cancelled before subscribing to this
* Completable.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestSubscriber instance
* @return the new TestObserver instance
* @since 2.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final TestSubscriber<Void> test(boolean cancelled) {
TestSubscriber<Void> ts = new TestSubscriber<Void>();
public final TestObserver<Void> test(boolean cancelled) {
TestObserver<Void> ts = new TestObserver<Void>();

if (cancelled) {
ts.cancel();
}
subscribe(new SubscriberCompletableObserver<Void>(ts));
subscribe(ts);
return ts;
}
}
16 changes: 8 additions & 8 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4862,15 +4862,15 @@ public static <T, R> Flowable<R> zipIterable(Iterable<? extends Publisher<? exte
*
* @param predicate
* a function that evaluates an item and returns a Boolean
* @return a Flowable that emits {@code true} if all items emitted by the source Publisher satisfy the
* @return a Single that emits {@code true} if all items emitted by the source Publisher satisfy the
* predicate; otherwise, {@code false}
* @see <a href="http://reactivex.io/documentation/operators/all.html">ReactiveX operators documentation: All</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Boolean> all(Predicate<? super T> predicate) {
public final Single<Boolean> all(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new FlowableAll<T>(this, predicate));
return RxJavaPlugins.onAssembly(new FlowableAllSingle<T>(this, predicate));
}

/**
Expand Down Expand Up @@ -4919,15 +4919,15 @@ public final Flowable<T> ambWith(Publisher<? extends T> other) {
*
* @param predicate
* the condition to test items emitted by the source Publisher
* @return a Flowable that emits a Boolean that indicates whether any item emitted by the source
* @return a Single that emits a Boolean that indicates whether any item emitted by the source
* Publisher satisfies the {@code predicate}
* @see <a href="http://reactivex.io/documentation/operators/contains.html">ReactiveX operators documentation: Contains</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Boolean> any(Predicate<? super T> predicate) {
public final Single<Boolean> any(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new FlowableAny<T>(this, predicate));
return RxJavaPlugins.onAssembly(new FlowableAnySingle<T>(this, predicate));
}

/**
Expand Down Expand Up @@ -6660,7 +6660,7 @@ public final Flowable<T> concatWith(Publisher<? extends T> other) {
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Boolean> contains(final Object item) {
public final Single<Boolean> contains(final Object item) {
ObjectHelper.requireNonNull(item, "item is null");
return any(Functions.equalsWith(item));
}
Expand Down Expand Up @@ -8814,7 +8814,7 @@ public final Flowable<T> ignoreElements() {
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<Boolean> isEmpty() {
public final Single<Boolean> isEmpty() {
return all(Functions.alwaysFalse());
}

Expand Down
24 changes: 12 additions & 12 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.util.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;

/**
* Represents a deferred computation and emission of a maybe value or exception.
Expand Down Expand Up @@ -3776,40 +3776,40 @@ public final <U, R> Maybe<R> zipWith(MaybeSource<? extends U> other, BiFunction<
// ------------------------------------------------------------------

/**
* Creates a TestSubscriber and subscribes
* Creates a TestObserver and subscribes
* it to this Maybe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestSubscriber instance
* @return the new TestObserver instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final TestSubscriber<T> test() {
TestSubscriber<T> ts = new TestSubscriber<T>();
toFlowable().subscribe(ts);
public final TestObserver<T> test() {
TestObserver<T> ts = new TestObserver<T>();
subscribe(ts);
return ts;
}

/**
* Creates a TestSubscriber optionally in cancelled state, then subscribes it to this Maybe.
* Creates a TestObserver optionally in cancelled state, then subscribes it to this Maybe.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
* @param cancelled if true, the TestObserver will be cancelled before subscribing to this
* Maybe.
* @return the new TestSubscriber instance
* @return the new TestObserver instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final TestSubscriber<T> test(boolean cancelled) {
TestSubscriber<T> ts = new TestSubscriber<T>();
public final TestObserver<T> test(boolean cancelled) {
TestObserver<T> ts = new TestObserver<T>();

if (cancelled) {
ts.cancel();
}

toFlowable().subscribe(ts);
subscribe(ts);
return ts;
}
}
20 changes: 10 additions & 10 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4260,14 +4260,14 @@ public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSour
*
* @param predicate
* a function that evaluates an item and returns a Boolean
* @return an Observable that emits {@code true} if all items emitted by the source ObservableSource satisfy the
* @return a Single that emits {@code true} if all items emitted by the source ObservableSource satisfy the
* predicate; otherwise, {@code false}
* @see <a href="http://reactivex.io/documentation/operators/all.html">ReactiveX operators documentation: All</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Boolean> all(Predicate<? super T> predicate) {
public final Single<Boolean> all(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableAll<T>(this, predicate));
return RxJavaPlugins.onAssembly(new ObservableAllSingle<T>(this, predicate));
}

/**
Expand Down Expand Up @@ -4309,14 +4309,14 @@ public final Observable<T> ambWith(ObservableSource<? extends T> other) {
*
* @param predicate
* the condition to test items emitted by the source ObservableSource
* @return an Observable that emits a Boolean that indicates whether any item emitted by the source
* @return a Single that emits a Boolean that indicates whether any item emitted by the source
* ObservableSource satisfies the {@code predicate}
* @see <a href="http://reactivex.io/documentation/operators/contains.html">ReactiveX operators documentation: Contains</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Boolean> any(Predicate<? super T> predicate) {
public final Single<Boolean> any(Predicate<? super T> predicate) {
ObjectHelper.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ObservableAny<T>(this, predicate));
return RxJavaPlugins.onAssembly(new ObservableAnySingle<T>(this, predicate));
}

/**
Expand Down Expand Up @@ -5798,12 +5798,12 @@ public final Observable<T> concatWith(ObservableSource<? extends T> other) {
*
* @param element
* the item to search for in the emissions from the source ObservableSource
* @return an Observable that emits {@code true} if the specified item is emitted by the source ObservableSource,
* @return a Single that emits {@code true} if the specified item is emitted by the source ObservableSource,
* or {@code false} if the source ObservableSource completes without emitting that item
* @see <a href="http://reactivex.io/documentation/operators/contains.html">ReactiveX operators documentation: Contains</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Boolean> contains(final Object element) {
public final Single<Boolean> contains(final Object element) {
ObjectHelper.requireNonNull(element, "element is null");
return any(Functions.equalsWith(element));
}
Expand Down Expand Up @@ -7594,11 +7594,11 @@ public final Observable<T> ignoreElements() {
* <dd>{@code isEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return an Observable that emits a Boolean
* @return a Single that emits a Boolean
* @see <a href="http://reactivex.io/documentation/operators/contains.html">ReactiveX operators documentation: Contains</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<Boolean> isEmpty() {
public final Single<Boolean> isEmpty() {
return all(Functions.alwaysFalse());
}

Expand Down
26 changes: 13 additions & 13 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import java.util.concurrent.*;

import org.reactivestreams.*;
import org.reactivestreams.Publisher;

import io.reactivex.annotations.*;
import io.reactivex.disposables.Disposable;
Expand All @@ -30,9 +30,9 @@
import io.reactivex.internal.operators.observable.ObservableConcatMap;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.util.*;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;

/**
* The Single class implements the Reactive Pattern for a single value response.
Expand Down Expand Up @@ -2806,42 +2806,42 @@ public final <U, R> Single<R> zipWith(SingleSource<U> other, BiFunction<? super
// Fluent test support, super handy and reduces test preparation boilerplate
// -------------------------------------------------------------------------
/**
* Creates a TestSubscriber and subscribes
* Creates a TestObserver and subscribes
* it to this Single.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the new TestSubscriber instance
* @return the new TestObserver instance
* @since 2.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final TestSubscriber<T> test() {
TestSubscriber<T> ts = new TestSubscriber<T>();
toFlowable().subscribe(ts);
public final TestObserver<T> test() {
TestObserver<T> ts = new TestObserver<T>();
subscribe(ts);
return ts;
}

/**
* Creates a TestSubscriber optionally in cancelled state, then subscribes it to this Single.
* Creates a TestObserver optionally in cancelled state, then subscribes it to this Single.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code test} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
* @param cancelled if true, the TestObserver will be cancelled before subscribing to this
* Single.
* @return the new TestSubscriber instance
* @return the new TestObserver instance
* @since 2.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final TestSubscriber<T> test(boolean cancelled) {
TestSubscriber<T> ts = new TestSubscriber<T>();
public final TestObserver<T> test(boolean cancelled) {
TestObserver<T> ts = new TestObserver<T>();

if (cancelled) {
ts.cancel();
}

toFlowable().subscribe(ts);
subscribe(ts);
return ts;
}
}
Loading