Skip to content

Commit 9de559e

Browse files
committed
merge converted to flatMap
1 parent 9af0485 commit 9de559e

File tree

2 files changed

+150
-76
lines changed

2 files changed

+150
-76
lines changed

src/main/java/rx/Observable.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1635,8 +1635,9 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
16351635
* {@code source} Observable
16361636
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16371637
*/
1638+
@SuppressWarnings({"unchecked", "rawtypes"})
16381639
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
1639-
return source.lift(OperatorMerge.<T>instance(false));
1640+
return source.flatMap((Func1)UtilityFunctions.identity());
16401641
}
16411642

16421643
/**
@@ -1664,8 +1665,9 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
16641665
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16651666
*/
16661667
@Experimental
1668+
@SuppressWarnings({"unchecked", "rawtypes"})
16671669
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
1668-
return source.lift(OperatorMerge.<T>instance(false, maxConcurrent));
1670+
return source.flatMap((Func1)UtilityFunctions.identity(), maxConcurrent);
16691671
}
16701672

16711673
/**
@@ -1985,8 +1987,9 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences,
19851987
* {@code source} Observable
19861988
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
19871989
*/
1990+
@SuppressWarnings({"unchecked", "rawtypes"})
19881991
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
1989-
return source.lift(OperatorMerge.<T>instance(true));
1992+
return source.flatMap((Func1)UtilityFunctions.identity(), Integer.MAX_VALUE, true);
19901993
}
19911994
/**
19921995
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
@@ -2016,8 +2019,9 @@ public final static <T> Observable<T> mergeDelayError(Observable<? extends Obser
20162019
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
20172020
*/
20182021
@Experimental
2022+
@SuppressWarnings({"unchecked", "rawtypes"})
20192023
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
2020-
return source.lift(OperatorMerge.<T>instance(true, maxConcurrent));
2024+
return source.flatMap((Func1)UtilityFunctions.identity(), maxConcurrent, true);
20212025
}
20222026

20232027
/**
@@ -4614,7 +4618,7 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
46144618
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
46154619
*/
46164620
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
4617-
return merge(map(func));
4621+
return lift(OperatorMerge.instance(false, Integer.MAX_VALUE, func));
46184622
}
46194623

46204624
/**
@@ -4642,7 +4646,67 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
46424646
*/
46434647
@Beta
46444648
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
4645-
return merge(map(func), maxConcurrent);
4649+
return flatMap(func, maxConcurrent, false);
4650+
}
4651+
4652+
/**
4653+
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
4654+
* by the source Observable, where that function returns an Observable, and then merging those resulting
4655+
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
4656+
* subscriptions to these Observables.
4657+
* <p>
4658+
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
4659+
* <dl>
4660+
* <dt><b>Scheduler:</b></dt>
4661+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
4662+
* </dl>
4663+
*
4664+
* @param func
4665+
* a function that, when applied to an item emitted by the source Observable, returns an
4666+
* Observable
4667+
* @param maxConcurrent
4668+
* the maximum number of Observables that may be subscribed to concurrently
4669+
* @param delayErrors
4670+
* should the errors delayed until all sources have terminated in some way
4671+
* @return an Observable that emits the result of applying the transformation function to each item emitted
4672+
* by the source Observable and merging the results of the Observables obtained from this
4673+
* transformation
4674+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
4675+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
4676+
*/
4677+
@Beta
4678+
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent, boolean delayErrors) {
4679+
return lift(OperatorMerge.instance(delayErrors, maxConcurrent, func));
4680+
}
4681+
4682+
/**
4683+
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
4684+
* by the source Observable, where that function returns an Observable, and then merging those resulting
4685+
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
4686+
* subscriptions to these Observables.
4687+
* <p>
4688+
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
4689+
* <dl>
4690+
* <dt><b>Scheduler:</b></dt>
4691+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
4692+
* </dl>
4693+
*
4694+
* @param func
4695+
* a function that, when applied to an item emitted by the source Observable, returns an
4696+
* Observable
4697+
* @param maxConcurrent
4698+
* the maximum number of Observables that may be subscribed to concurrently
4699+
* @param delayErrors
4700+
* should the errors delayed until all sources have terminated in some way
4701+
* @return an Observable that emits the result of applying the transformation function to each item emitted
4702+
* by the source Observable and merging the results of the Observables obtained from this
4703+
* transformation
4704+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
4705+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
4706+
*/
4707+
@Beta
4708+
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, boolean delayErrors) {
4709+
return lift(OperatorMerge.instance(delayErrors, Integer.MAX_VALUE, func));
46464710
}
46474711

46484712
/**
@@ -4791,7 +4855,7 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
47914855
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
47924856
*/
47934857
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
4794-
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
4858+
return flatMap(OperatorMapPair.convertSelector(collectionSelector));
47954859
}
47964860

47974861
/**

0 commit comments

Comments
 (0)