Skip to content

Commit ff3c5d0

Browse files
authored
2.x: add new methods to Maybe, Observable and Single from 4481 (#4488)
* 2.x: add new methods to Maybe, Observable and Single from 4481 * Fix javadoc mistakes
1 parent 928e437 commit ff3c5d0

File tree

9 files changed

+561
-19
lines changed

9 files changed

+561
-19
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,34 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
700700
// Instance methods
701701
// ------------------------------------------------------------------
702702

703+
/**
704+
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
705+
* null if completed or an exception (which is propagated).
706+
* <dl>
707+
* <dt><b>Scheduler:</b></dt>
708+
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
709+
* </dl>
710+
* @return the success value
711+
*/
712+
public T blockingGet() {
713+
return MaybeAwait.get(this, null);
714+
}
715+
716+
/**
717+
* Waits in a blocking fashion until the current Maybe signals a success value (which is returned),
718+
* defaultValue if completed or an exception (which is propagated).
719+
* <dl>
720+
* <dt><b>Scheduler:</b></dt>
721+
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
722+
* </dl>
723+
* @param defaultValue the default item to return if this Maybe is empty
724+
* @return the success value
725+
*/
726+
public T blockingGet(T defaultValue) {
727+
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
728+
return MaybeAwait.get(this, defaultValue);
729+
}
730+
703731
/**
704732
* Casts the success value of the current Maybe into the target type or signals a
705733
* ClassCastException if not compatible.
@@ -996,6 +1024,69 @@ public final <R> Maybe<R> flatMap(
9961024
return new MaybeFlatMapNotification<T, R>(this, onSuccessMapper, onErrorMapper, onCompleteSupplier);
9971025
}
9981026

1027+
/**
1028+
* Returns a Observable that is based on applying a specified function to the item emitted by the source Maybe,
1029+
* where that function returns a ObservableSource.
1030+
* <p>
1031+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.flatMap.png" alt="">
1032+
* <dl>
1033+
* <dt><b>Scheduler:</b></dt>
1034+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
1035+
* </dl>
1036+
*
1037+
* @param <R> the result value type
1038+
* @param mapper
1039+
* a function that, when applied to the item emitted by the source Maybe, returns a ObservableSource
1040+
* @return the Observable returned from {@code func} when applied to the item emitted by the source Maybe
1041+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1042+
*/
1043+
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
1044+
return toObservable().flatMap(mapper);
1045+
}
1046+
1047+
/**
1048+
* Returns a Flowable that emits items based on applying a specified function to the item emitted by the
1049+
* source Maybe, where that function returns a Publisher.
1050+
* <p>
1051+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapObservable.png" alt="">
1052+
* <dl>
1053+
* <dt><b>Scheduler:</b></dt>
1054+
* <dd>{@code flatMapObservable} does not operate by default on a particular {@link Scheduler}.</dd>
1055+
* </dl>
1056+
*
1057+
* @param <R> the result value type
1058+
* @param mapper
1059+
* a function that, when applied to the item emitted by the source Maybe, returns an
1060+
* Flowable
1061+
* @return the Flowable returned from {@code func} when applied to the item emitted by the source Maybe
1062+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1063+
*/
1064+
public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publisher<? extends R>> mapper) {
1065+
return toFlowable().flatMap(mapper);
1066+
}
1067+
1068+
/**
1069+
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
1070+
* source {@link Single}, where that function returns a {@link Completable}.
1071+
* <p>
1072+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapCompletable.png" alt="">
1073+
* <dl>
1074+
* <dt><b>Scheduler:</b></dt>
1075+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
1076+
* </dl>
1077+
*
1078+
* @param mapper
1079+
* a function that, when applied to the item emitted by the source Single, returns a
1080+
* Completable
1081+
* @return the Completable returned from {@code func} when applied to the item emitted by the source Single
1082+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1083+
* @since 2.0
1084+
*/
1085+
public final Completable flatMapCompletable(final Function<? super T, ? extends Completable> mapper) {
1086+
ObjectHelper.requireNonNull(mapper, "mapper is null");
1087+
return RxJavaPlugins.onAssembly(new MaybeFlatMapCompletable<T>(this, mapper));
1088+
}
1089+
9991090
/**
10001091
* Ignores the item emitted by the source Maybe and only calls {@code onCompleted} or {@code onError}.
10011092
* <p>
@@ -1148,6 +1239,21 @@ public final Observable<T> toObservable() {
11481239
return RxJavaPlugins.onAssembly(new MaybeToObservable<T>(this));
11491240
}
11501241

1242+
/**
1243+
* Converts this Maybe into an Single instance composing cancellation
1244+
* through and turing an empty Maybe into a signal of NoSuchElementException.
1245+
* <dl>
1246+
* <dt><b>Scheduler:</b></dt>
1247+
* <dd>{@code create} does not operate by default on a particular {@link Scheduler}.</dd>
1248+
* </dl>
1249+
* @param defaultValue the default item to signal in Single if this Maybe is empty
1250+
* @return the new Single instance
1251+
*/
1252+
public final Single<T> toSingle(T defaultValue) {
1253+
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
1254+
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, defaultValue));
1255+
}
1256+
11511257
/**
11521258
* Converts this Maybe into an Single instance composing cancellation
11531259
* through and turing an empty Maybe into a signal of NoSuchElementException.
@@ -1158,7 +1264,7 @@ public final Observable<T> toObservable() {
11581264
* @return the new Single instance
11591265
*/
11601266
public final Single<T> toSingle() {
1161-
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this));
1267+
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, null));
11621268
}
11631269

11641270
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11499,7 +11499,7 @@ public final Maybe<T> toMaybe() {
1149911499
/**
1150011500
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
1150111501
* emits only a single item. If the source ObservableSource emits more than one item or no items, notify of an
11502-
* {@code IllegalArgumentException} or {@code NoSuchElementException} respectively.
11502+
* {@code IndexOutOfBoundsException} or {@code NoSuchElementException} respectively.
1150311503
* <p>
1150411504
* <img width="640" height="295" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toSingle.png" alt="">
1150511505
* <dl>
@@ -11508,18 +11508,35 @@ public final Maybe<T> toMaybe() {
1150811508
* </dl>
1150911509
*
1151011510
* @return a Single that emits the single item emitted by the source ObservableSource
11511-
* @throws IllegalArgumentException
11512-
* if the source ObservableSource emits more than one item
11513-
* @throws NoSuchElementException
11514-
* if the source ObservableSource emits no items
1151511511
* @see <a href="http://reactivex.io/documentation/single.html">ReactiveX documentation: Single</a>
1151611512
* @since 2.0
1151711513
*/
1151811514
@SchedulerSupport(SchedulerSupport.NONE)
1151911515
public final Single<T> toSingle() {
11520-
return RxJavaPlugins.onAssembly(new SingleFromObservable<T>(this));
11516+
return RxJavaPlugins.onAssembly(new SingleFromObservable<T>(this, null));
1152111517
}
1152211518

11519+
/**
11520+
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
11521+
* emits only a single item or emits the given defaultIfEmpty value if the ObservableSource is empty.
11522+
* If the source ObservableSource emits more than one item a {@link IndexOutOfBoundsException} is signalled.
11523+
* <p>
11524+
* <img width="640" height="295" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toSingle.png" alt="">
11525+
* <dl>
11526+
* <dt><b>Scheduler:</b></dt>
11527+
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
11528+
* </dl>
11529+
* @param defaultIfEmpty the value the Single will signal if this Observable is empty
11530+
* @return a Single that emits the single item emitted by the source ObservableSource
11531+
* @see <a href="http://reactivex.io/documentation/single.html">ReactiveX documentation: Single</a>
11532+
* @since 2.0
11533+
*/
11534+
@SchedulerSupport(SchedulerSupport.NONE)
11535+
public final Single<T> toSingle(T defaultIfEmpty) {
11536+
ObjectHelper.requireNonNull(defaultIfEmpty, "defaultIfEmpty is null");
11537+
return RxJavaPlugins.onAssembly(new SingleFromObservable<T>(this, defaultIfEmpty));
11538+
}
11539+
1152311540
/**
1152411541
* Returns an Observable that emits a list that contains the items emitted by the source ObservableSource, in a
1152511542
* sorted order. Each item emitted by the ObservableSource must implement {@link Comparable} with respect to all

src/main/java/io/reactivex/Single.java

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivex.internal.operators.completable.*;
2727
import io.reactivex.internal.operators.flowable.*;
2828
import io.reactivex.internal.operators.maybe.*;
29+
import io.reactivex.internal.operators.observable.ObservableConcatMap;
2930
import io.reactivex.internal.operators.single.*;
3031
import io.reactivex.internal.subscribers.single.*;
3132
import io.reactivex.internal.util.*;
@@ -113,6 +114,23 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
113114
return concat(Flowable.fromIterable(sources));
114115
}
115116

117+
/**
118+
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
119+
* a Publisher sequence.
120+
* <dl>
121+
* <dt><b>Scheduler:</b></dt>
122+
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
123+
* </dl>
124+
* @param <T> the value type
125+
* @param sources the Publisher of SingleSource instances
126+
* @return the new Flowable instance
127+
* @since 2.0
128+
*/
129+
@SuppressWarnings({ "unchecked", "rawtypes" })
130+
public static <T> Observable<T> concat(Observable<? extends SingleSource<? extends T>> sources) {
131+
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
132+
}
133+
116134
/**
117135
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
118136
* a Publisher sequence.
@@ -1705,6 +1723,26 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
17051723
return toFlowable().flatMap(mapper);
17061724
}
17071725

1726+
/**
1727+
* Returns a Single that is based on applying a specified function to the item emitted by the source Single,
1728+
* where that function returns a SingleSource.
1729+
* <p>
1730+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.png" alt="">
1731+
* <dl>
1732+
* <dt><b>Scheduler:</b></dt>
1733+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
1734+
* </dl>
1735+
*
1736+
* @param <R> the result value type
1737+
* @param mapper
1738+
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
1739+
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
1740+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1741+
*/
1742+
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
1743+
return toObservable().flatMap(mapper);
1744+
}
1745+
17081746
/**
17091747
* Returns a {@link Completable} that completes based on applying a specified function to the item emitted by the
17101748
* source {@link Single}, where that function returns a {@link Completable}.
@@ -2483,6 +2521,21 @@ public final Flowable<T> toFlowable() {
24832521
return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
24842522
}
24852523

2524+
/**
2525+
* Returns a {@link Future} representing the single value emitted by this {@code Single}.
2526+
* <p>
2527+
* <img width="640" height="395" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.toFuture.png" alt="">
2528+
* <dl>
2529+
* <dt><b>Scheduler:</b></dt>
2530+
* <dd>{@code toFuture} does not operate by default on a particular {@link Scheduler}.</dd>
2531+
* </dl>
2532+
*
2533+
* @return a {@link Future} that expects a single item to be emitted by this {@code Single}
2534+
* @see <a href="http://reactivex.io/documentation/operators/to.html">ReactiveX documentation: To</a>
2535+
*/
2536+
public final Future<T> toFuture() {
2537+
return subscribeWith(new FutureSingleObserver<T>());
2538+
}
24862539

24872540
/**
24882541
* Converts this Single into a {@link Maybe}.
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import java.util.NoSuchElementException;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.util.ExceptionHelper;
23+
24+
public enum MaybeAwait {
25+
;
26+
27+
public static <T> T get(MaybeSource<T> source, final T defaultValue) {
28+
final AtomicReference<T> valueRef = new AtomicReference<T>();
29+
final AtomicReference<Throwable> errorRef = new AtomicReference<Throwable>();
30+
final CountDownLatch cdl = new CountDownLatch(1);
31+
32+
source.subscribe(new MaybeObserver<T>() {
33+
@Override
34+
public void onError(Throwable e) {
35+
errorRef.lazySet(e);
36+
cdl.countDown();
37+
}
38+
39+
@Override
40+
public void onSubscribe(Disposable d) {
41+
}
42+
43+
@Override
44+
public void onSuccess(T value) {
45+
valueRef.lazySet(value);
46+
cdl.countDown();
47+
}
48+
49+
@Override
50+
public void onComplete() {
51+
if (defaultValue != null) {
52+
valueRef.lazySet(defaultValue);
53+
} else {
54+
errorRef.lazySet(new NoSuchElementException());
55+
}
56+
cdl.countDown();
57+
}
58+
});
59+
60+
if (cdl.getCount() != 0L) {
61+
try {
62+
cdl.await();
63+
} catch (InterruptedException ex) {
64+
throw new IllegalStateException(ex);
65+
}
66+
}
67+
Throwable e = errorRef.get();
68+
if (e != null) {
69+
throw ExceptionHelper.wrapOrThrow(e);
70+
}
71+
return valueRef.get();
72+
}
73+
}

0 commit comments

Comments
 (0)