Skip to content

Commit 971d209

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
flatMap full rewrite.
1 parent 2bf39a7 commit 971d209

14 files changed

+1399
-1112
lines changed

src/main/java/rx/Observable.java

Lines changed: 127 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1635,8 +1635,9 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
16351635
* {@code source} Observable
16361636
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16371637
*/
1638+
@SuppressWarnings({"unchecked", "rawtypes"})
16381639
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
1639-
return source.lift(OperatorMerge.<T>instance(false));
1640+
return source.flatMap((Func1)UtilityFunctions.identity());
16401641
}
16411642

16421643
/**
@@ -1663,8 +1664,10 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
16631664
* if {@code maxConcurrent} is less than or equal to 0
16641665
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
16651666
*/
1667+
@Experimental
1668+
@SuppressWarnings({"unchecked", "rawtypes"})
16661669
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
1667-
return source.lift(new OperatorMergeMaxConcurrent<T>(maxConcurrent));
1670+
return source.flatMap((Func1)UtilityFunctions.identity(), maxConcurrent);
16681671
}
16691672

16701673
/**
@@ -1935,7 +1938,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
19351938
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
19361939
return merge(from(sequences));
19371940
}
1938-
1941+
1942+
/**
1943+
* Flattens an Array of Observables into one Observable, without any transformation, while limiting the
1944+
* number of concurrent subscriptions to these Observables.
1945+
* <p>
1946+
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
1947+
* <p>
1948+
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
1949+
* using the {@code merge} method.
1950+
* <dl>
1951+
* <dt><b>Scheduler:</b></dt>
1952+
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
1953+
* </dl>
1954+
*
1955+
* @param sequences
1956+
* the Array of Observables
1957+
* @param maxConcurrent
1958+
* the maximum number of Observables that may be subscribed to concurrently
1959+
* @return an Observable that emits all of the items emitted by the Observables in the Array
1960+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1961+
*/
1962+
@Experimental
1963+
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
1964+
return merge(from(sequences), maxConcurrent);
1965+
}
19391966
/**
19401967
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
19411968
* receive all successfully emitted items from all of the source Observables without being interrupted by
@@ -1960,8 +1987,41 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
19601987
* {@code source} Observable
19611988
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
19621989
*/
1990+
@SuppressWarnings({"unchecked", "rawtypes"})
19631991
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
1964-
return source.lift(OperatorMerge.<T>instance(true));
1992+
return source.flatMap((Func1)UtilityFunctions.identity(), Integer.MAX_VALUE, true);
1993+
}
1994+
/**
1995+
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
1996+
* receive all successfully emitted items from all of the source Observables without being interrupted by
1997+
* an error notification from one of them, while limiting the
1998+
* number of concurrent subscriptions to these Observables.
1999+
* <p>
2000+
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
2001+
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
2002+
* error notification until all of the merged Observables have finished emitting items.
2003+
* <p>
2004+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
2005+
* <p>
2006+
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
2007+
* invoke the {@code onError} method of its Observers once.
2008+
* <dl>
2009+
* <dt><b>Scheduler:</b></dt>
2010+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
2011+
* </dl>
2012+
*
2013+
* @param source
2014+
* an Observable that emits Observables
2015+
* @param maxConcurrent
2016+
* the maximum number of Observables that may be subscribed to concurrently
2017+
* @return an Observable that emits all of the items emitted by the Observables emitted by the
2018+
* {@code source} Observable
2019+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
2020+
*/
2021+
@Experimental
2022+
@SuppressWarnings({"unchecked", "rawtypes"})
2023+
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
2024+
return source.flatMap((Func1)UtilityFunctions.identity(), maxConcurrent, true);
19652025
}
19662026

19672027
/**
@@ -4558,7 +4618,7 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
45584618
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
45594619
*/
45604620
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
4561-
return merge(map(func));
4621+
return lift(OperatorMerge.instance(false, Integer.MAX_VALUE, func));
45624622
}
45634623

45644624
/**
@@ -4586,7 +4646,67 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
45864646
*/
45874647
@Beta
45884648
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
4589-
return merge(map(func), maxConcurrent);
4649+
return flatMap(func, maxConcurrent, false);
4650+
}
4651+
4652+
/**
4653+
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
4654+
* by the source Observable, where that function returns an Observable, and then merging those resulting
4655+
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
4656+
* subscriptions to these Observables.
4657+
* <p>
4658+
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
4659+
* <dl>
4660+
* <dt><b>Scheduler:</b></dt>
4661+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
4662+
* </dl>
4663+
*
4664+
* @param func
4665+
* a function that, when applied to an item emitted by the source Observable, returns an
4666+
* Observable
4667+
* @param maxConcurrent
4668+
* the maximum number of Observables that may be subscribed to concurrently
4669+
* @param delayErrors
4670+
* should the errors delayed until all sources have terminated in some way
4671+
* @return an Observable that emits the result of applying the transformation function to each item emitted
4672+
* by the source Observable and merging the results of the Observables obtained from this
4673+
* transformation
4674+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
4675+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
4676+
*/
4677+
@Beta
4678+
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent, boolean delayErrors) {
4679+
return lift(OperatorMerge.instance(delayErrors, maxConcurrent, func));
4680+
}
4681+
4682+
/**
4683+
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
4684+
* by the source Observable, where that function returns an Observable, and then merging those resulting
4685+
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
4686+
* subscriptions to these Observables.
4687+
* <p>
4688+
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
4689+
* <dl>
4690+
* <dt><b>Scheduler:</b></dt>
4691+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
4692+
* </dl>
4693+
*
4694+
* @param func
4695+
* a function that, when applied to an item emitted by the source Observable, returns an
4696+
* Observable
4697+
* @param maxConcurrent
4698+
* the maximum number of Observables that may be subscribed to concurrently
4699+
* @param delayErrors
4700+
* should the errors delayed until all sources have terminated in some way
4701+
* @return an Observable that emits the result of applying the transformation function to each item emitted
4702+
* by the source Observable and merging the results of the Observables obtained from this
4703+
* transformation
4704+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
4705+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
4706+
*/
4707+
@Beta
4708+
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, boolean delayErrors) {
4709+
return lift(OperatorMerge.instance(delayErrors, Integer.MAX_VALUE, func));
45904710
}
45914711

45924712
/**
@@ -4735,7 +4855,7 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
47354855
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
47364856
*/
47374857
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
4738-
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
4858+
return flatMap(OperatorMapPair.convertSelector(collectionSelector));
47394859
}
47404860

47414861
/**

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,4 +77,4 @@ public void onCompleted() {
7777
child.setProducer(producer);
7878
return s;
7979
}
80-
}
80+
}

0 commit comments

Comments
 (0)