Skip to content

Commit 1f8c2b3

Browse files
committed
Add maxConcurrent overload to flatMapIterable
1 parent 84ac53e commit 1f8c2b3

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5558,6 +5558,36 @@ public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterab
55585558
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
55595559
}
55605560

5561+
/**
5562+
* Returns an Observable that merges each item emitted by the source Observable with the values in an
5563+
* Iterable corresponding to that item that is generated by a selector, while limiting the number of concurrent
5564+
* subscriptions to these Observables.
5565+
* <p>
5566+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
5567+
* <dl>
5568+
* <dt><b>Scheduler:</b></dt>
5569+
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
5570+
* </dl>
5571+
*
5572+
* @param <R>
5573+
* the type of item emitted by the resulting Observable
5574+
* @param collectionSelector
5575+
* a function that returns an Iterable sequence of values for when given an item emitted by the
5576+
* source Observable
5577+
* @param maxConcurrent
5578+
* the maximum number of Observables that may be subscribed to concurrently
5579+
* @return an Observable that emits the results of merging the items emitted by the source Observable with
5580+
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
5581+
* @throws IllegalArgumentException
5582+
* if {@code maxConcurrent} is less than or equal to 0
5583+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
5584+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5585+
*/
5586+
@Beta
5587+
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector, int maxConcurrent) {
5588+
return merge(map(OperatorMapPair.convertSelector(collectionSelector)), maxConcurrent);
5589+
}
5590+
55615591
/**
55625592
* Returns an Observable that emits the results of applying a function to the pair of values from the source
55635593
* Observable and an Iterable corresponding to that item that is generated by a selector.
@@ -5587,6 +5617,42 @@ public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Ite
55875617
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
55885618
}
55895619

5620+
/**
5621+
* Returns an Observable that emits the results of applying a function to the pair of values from the source
5622+
* Observable and an Iterable corresponding to that item that is generated by a selector, while limiting the
5623+
* number of concurrent subscriptions to these Observables.
5624+
* <p>
5625+
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.r.png" alt="">
5626+
* <dl>
5627+
* <dt><b>Scheduler:</b></dt>
5628+
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
5629+
* </dl>
5630+
*
5631+
* @param <U>
5632+
* the collection element type
5633+
* @param <R>
5634+
* the type of item emited by the resulting Observable
5635+
* @param collectionSelector
5636+
* a function that returns an Iterable sequence of values for each item emitted by the source
5637+
* Observable
5638+
* @param resultSelector
5639+
* a function that returns an item based on the item emitted by the source Observable and the
5640+
* Iterable returned for that item by the {@code collectionSelector}
5641+
* @param maxConcurrent
5642+
* the maximum number of Observables that may be subscribed to concurrently
5643+
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
5644+
* Observable
5645+
* @throws IllegalArgumentException
5646+
* if {@code maxConcurrent} is less than or equal to 0
5647+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
5648+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5649+
*/
5650+
@Beta
5651+
public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
5652+
Func2<? super T, ? super U, ? extends R> resultSelector, int maxConcurrent) {
5653+
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector, maxConcurrent);
5654+
}
5655+
55905656
/**
55915657
* Subscribes to the {@link Observable} and receives notifications for each element.
55925658
* <p>

0 commit comments

Comments
 (0)