Skip to content

2.x: add new methods to Maybe, Observable and Single from 4481 #4488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 6, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 107 additions & 1 deletion src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,34 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
// Instance methods
// ------------------------------------------------------------------

/**
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
* null if completed or an exception (which is propagated).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @return the success value
*/
public T blockingGet() {
return MaybeAwait.get(this, null);
}

/**
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
* defaultValue if completed or an exception (which is propagated).
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultValue the default item to return if this Maybe is empty
* @return the success value
*/
public T blockingGet(T defaultValue) {
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
return MaybeAwait.get(this, defaultValue);
}

/**
* Casts the success value of the current Maybe into the target type or signals a
* ClassCastException if not compatible.
Expand Down Expand Up @@ -996,6 +1024,69 @@ public final <R> Maybe<R> flatMap(
return new MaybeFlatMapNotification<T, R>(this, onSuccessMapper, onErrorMapper, onCompleteSupplier);
}

/**
* Returns a Observable that is based on applying a specified function to the item emitted by the source Maybe,
* where that function returns a ObservableSource.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Maybe, returns a ObservableSource
* @return the Observable returned from {@code func} when applied to the item emitted by the source Maybe
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return toObservable().flatMap(mapper);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this also be redone later in the same way flatMapCompletable works now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this could receive a custom implementation.

}

/**
* Returns a Flowable that emits items based on applying a specified function to the item emitted by the
* source Maybe, where that function returns a Publisher.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapObservable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Maybe, returns an
* Flowable
* @return the Flowable returned from {@code func} when applied to the item emitted by the source Maybe
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
return toFlowable().flatMap(mapper);
}

/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* source {@link Single}, where that function returns a {@link Completable}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapCompletable.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a
* Completable
* @return the Completable returned from {@code func} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since 2.0
*/
public final Completable flatMapCompletable(final Function<? super T, ? extends Completable> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new MaybeFlatMapCompletable<T>(this, mapper));
}

/**
* Ignores the item emitted by the source Maybe and only calls {@code onCompleted} or {@code onError}.
* <p>
Expand Down Expand Up @@ -1148,6 +1239,21 @@ public final Observable<T> toObservable() {
return RxJavaPlugins.onAssembly(new MaybeToObservable<T>(this));
}

/**
* Converts this Maybe into an Single instance composing cancellation
* through and turing an empty Maybe into a signal of NoSuchElementException.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultValue the default item to signal in Single if this Maybe is empty
* @return the new Single instance
*/
public final Single<T> toSingle(T defaultValue) {
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, defaultValue));
}

/**
* Converts this Maybe into an Single instance composing cancellation
* through and turing an empty Maybe into a signal of NoSuchElementException.
Expand All @@ -1158,7 +1264,7 @@ public final Observable<T> toObservable() {
* @return the new Single instance
*/
public final Single<T> toSingle() {
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this));
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, null));
}

/**
Expand Down
29 changes: 23 additions & 6 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11499,7 +11499,7 @@ public final Maybe<T> toMaybe() {
/**
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
* emits only a single item. If the source ObservableSource emits more than one item or no items, notify of an
* {@code IllegalArgumentException} or {@code NoSuchElementException} respectively.
* {@code IndexOutOfBoundsException} or {@code NoSuchElementException} respectively.
* <p>
* <img width="640" height="295" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toSingle.png" alt="">
* <dl>
Expand All @@ -11508,18 +11508,35 @@ public final Maybe<T> toMaybe() {
* </dl>
*
* @return a Single that emits the single item emitted by the source ObservableSource
* @throws IllegalArgumentException
* if the source ObservableSource emits more than one item
* @throws NoSuchElementException
* if the source ObservableSource emits no items
* @see <a href="http://reactivex.io/documentation/single.html">ReactiveX documentation: Single</a>
* @since 2.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> toSingle() {
return RxJavaPlugins.onAssembly(new SingleFromObservable<T>(this));
return RxJavaPlugins.onAssembly(new SingleFromObservable<T>(this, null));
}

/**
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
* emits only a single item or emits the given defaultIfEmpty value if the ObservableSource is empty.
* If the source ObservableSource emits more than one item a {@link IndexOutOfBoundsException} is signalled.
* <p>
* <img width="640" height="295" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toSingle.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param defaultIfEmpty the value the Single will signal if this Observable is empty
* @return a Single that emits the single item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/single.html">ReactiveX documentation: Single</a>
* @since 2.0
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> toSingle(T defaultIfEmpty) {
ObjectHelper.requireNonNull(defaultIfEmpty, "defaultIfEmpty is null");
return RxJavaPlugins.onAssembly(new SingleFromObservable<T>(this, defaultIfEmpty));
}

/**
* Returns an Observable that emits a list that contains the items emitted by the source ObservableSource, in a
* sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all
Expand Down
53 changes: 53 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.observable.ObservableConcatMap;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.subscribers.single.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -113,6 +114,23 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
return concat(Flowable.fromIterable(sources));
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* a Publisher sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Publisher of SingleSource instances
* @return the new Flowable instance
* @since 2.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Observable<T> concat(Observable<? extends SingleSource<? extends T>> sources) {
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* a Publisher sequence.
Expand Down Expand Up @@ -1705,6 +1723,26 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
return toFlowable().flatMap(mapper);
}

/**
* Returns a Single that is based on applying a specified function to the item emitted by the source Single,
* where that function returns a SingleSource.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return toObservable().flatMap(mapper);
}

/**
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
* source {@link Single}, where that function returns a {@link Completable}.
Expand Down Expand Up @@ -2483,6 +2521,21 @@ public final Flowable<T> toFlowable() {
return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
}

/**
* Returns a {@link Future} representing the single value emitted by this {@code Single}.
* <p>
* <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toFuture} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return a {@link Future} that expects a single item to be emitted by this {@code Single}
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
*/
public final Future<T> toFuture() {
return subscribeWith(new FutureSingleObserver<T>());
}

/**
* Converts this Single into a {@link Maybe}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.util.ExceptionHelper;

public enum MaybeAwait {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasn't the conclusion to not use enums?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll clean up these separately as the algorithms needs to be compacted. Until then, this will show up in the coverage to bother me :)

;

public static <T> T get(MaybeSource<T> source, final T defaultValue) {
final AtomicReference<T> valueRef = new AtomicReference<T>();
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
final CountDownLatch cdl = new CountDownLatch(1);

source.subscribe(new MaybeObserver<T>() {
@Override
public void onError(Throwable e) {
errorRef.lazySet(e);
cdl.countDown();
}

@Override
public void onSubscribe(Disposable d) {
}

@Override
public void onSuccess(T value) {
valueRef.lazySet(value);
cdl.countDown();
}

@Override
public void onComplete() {
if (defaultValue != null) {
valueRef.lazySet(defaultValue);
} else {
errorRef.lazySet(new NoSuchElementException());
}
cdl.countDown();
}
});

if (cdl.getCount() != 0L) {
try {
cdl.await();
} catch (InterruptedException ex) {
throw new IllegalStateException(ex);
}
}
Throwable e = errorRef.get();
if (e != null) {
throw ExceptionHelper.wrapOrThrow(e);
}
return valueRef.get();
}
}
Loading