Skip to content

Operator flat map rewrite #3010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 127 additions & 7 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1635,8 +1635,9 @@ public final static <T> Observable<T> merge(Iterable<? extends Observable<? exte
* {@code source} Observable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(false));
return source.flatMap((Func1)UtilityFunctions.identity());
}

/**
Expand All @@ -1663,8 +1664,10 @@ public final static <T> Observable<T> merge(Observable<? extends Observable<? ex
* if {@code maxConcurrent} is less than or equal to 0
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@Experimental
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
return source.lift(new OperatorMergeMaxConcurrent<T>(maxConcurrent));
return source.flatMap((Func1)UtilityFunctions.identity(), maxConcurrent);
}

/**
Expand Down Expand Up @@ -1935,7 +1938,31 @@ public final static <T> Observable<T> merge(Observable<? extends T> t1, Observab
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences) {
return merge(from(sequences));
}


/**
* Flattens an Array of Observables into one Observable, without any transformation, while limiting the
* number of concurrent subscriptions to these Observables.
* <p>
* <img width="640" height="370" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/merge.io.png" alt="">
* <p>
* You can combine items emitted by multiple Observables so that they appear as a single Observable, by
* using the {@code merge} method.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param sequences
* the Array of Observables
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the Observables in the Array
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@Experimental
public final static <T> Observable<T> merge(Observable<? extends T>[] sequences, int maxConcurrent) {
return merge(from(sequences), maxConcurrent);
}
/**
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
* receive all successfully emitted items from all of the source Observables without being interrupted by
Expand All @@ -1960,8 +1987,41 @@ public final static <T> Observable<T> merge(Observable<? extends T>[] sequences)
* {@code source} Observable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source) {
return source.lift(OperatorMerge.<T>instance(true));
return source.flatMap((Func1)UtilityFunctions.identity(), Integer.MAX_VALUE, true);
}
/**
* Flattens an Observable that emits Observables into one Observable, in a way that allows an Observer to
* receive all successfully emitted items from all of the source Observables without being interrupted by
* an error notification from one of them, while limiting the
* number of concurrent subscriptions to these Observables.
* <p>
* This behaves like {@link #merge(Observable)} except that if any of the merged Observables notify of an
* error via {@link Observer#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error notification until all of the merged Observables have finished emitting items.
* <p>
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
* <p>
* Even if multiple merged Observables send {@code onError} notifications, {@code mergeDelayError} will only
* invoke the {@code onError} method of its Observers once.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param source
* an Observable that emits Observables
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @return an Observable that emits all of the items emitted by the Observables emitted by the
* {@code source} Observable
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@Experimental
@SuppressWarnings({"unchecked", "rawtypes"})
public final static <T> Observable<T> mergeDelayError(Observable<? extends Observable<? extends T>> source, int maxConcurrent) {
return source.flatMap((Func1)UtilityFunctions.identity(), maxConcurrent, true);
}

/**
Expand Down Expand Up @@ -4558,7 +4618,7 @@ public final Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boole
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
return merge(map(func));
return lift(OperatorMerge.instance(false, Integer.MAX_VALUE, func));
}

/**
Expand Down Expand Up @@ -4586,7 +4646,67 @@ public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? e
*/
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent) {
return merge(map(func), maxConcurrent);
return flatMap(func, maxConcurrent, false);
}

/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source Observable, where that function returns an Observable, and then merging those resulting
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
* subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* a function that, when applied to an item emitted by the source Observable, returns an
* Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @param delayErrors
* should the errors delayed until all sources have terminated in some way
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source Observable and merging the results of the Observables obtained from this
* transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, int maxConcurrent, boolean delayErrors) {
return lift(OperatorMerge.instance(delayErrors, maxConcurrent, func));
}

/**
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
* by the source Observable, where that function returns an Observable, and then merging those resulting
* Observables and emitting the results of this merger, while limiting the maximum number of concurrent
* subscriptions to these Observables.
* <p>
* <!-- <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/flatMap.png" alt=""> -->
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param func
* a function that, when applied to an item emitted by the source Observable, returns an
* Observable
* @param maxConcurrent
* the maximum number of Observables that may be subscribed to concurrently
* @param delayErrors
* should the errors delayed until all sources have terminated in some way
* @return an Observable that emits the result of applying the transformation function to each item emitted
* by the source Observable and merging the results of the Observables obtained from this
* transformation
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Beta
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func, boolean delayErrors) {
return lift(OperatorMerge.instance(delayErrors, Integer.MAX_VALUE, func));
}

/**
Expand Down Expand Up @@ -4735,7 +4855,7 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector) {
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
return flatMap(OperatorMapPair.convertSelector(collectionSelector));
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,4 @@ public void onCompleted() {
child.setProducer(producer);
return s;
}
}
}
Loading