Skip to content

Commit 2343989

Browse files
committed
Merge pull request #3722 from lukaciko/flatMapIterable-maxConcurrent
Add maxConcurrent parameter to flatMapIterable
2 parents 0f7c557 + 1f8c2b3 commit 2343989

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5652,6 +5652,36 @@ public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterab
56525652
return merge(map(OperatorMapPair.convertSelector(collectionSelector)));
56535653
}
56545654

5655+
/**
5656+
* Returns an Observable that merges each item emitted by the source Observable with the values in an
5657+
* Iterable corresponding to that item that is generated by a selector, while limiting the number of concurrent
5658+
* subscriptions to these Observables.
5659+
* <p>
5660+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.png" alt="">
5661+
* <dl>
5662+
* <dt><b>Scheduler:</b></dt>
5663+
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
5664+
* </dl>
5665+
*
5666+
* @param <R>
5667+
* the type of item emitted by the resulting Observable
5668+
* @param collectionSelector
5669+
* a function that returns an Iterable sequence of values for when given an item emitted by the
5670+
* source Observable
5671+
* @param maxConcurrent
5672+
* the maximum number of Observables that may be subscribed to concurrently
5673+
* @return an Observable that emits the results of merging the items emitted by the source Observable with
5674+
* the values in the Iterables corresponding to those items, as generated by {@code collectionSelector}
5675+
* @throws IllegalArgumentException
5676+
* if {@code maxConcurrent} is less than or equal to 0
5677+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
5678+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5679+
*/
5680+
@Beta
5681+
public final <R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends R>> collectionSelector, int maxConcurrent) {
5682+
return merge(map(OperatorMapPair.convertSelector(collectionSelector)), maxConcurrent);
5683+
}
5684+
56555685
/**
56565686
* Returns an Observable that emits the results of applying a function to the pair of values from the source
56575687
* Observable and an Iterable corresponding to that item that is generated by a selector.
@@ -5681,6 +5711,42 @@ public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Ite
56815711
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector);
56825712
}
56835713

5714+
/**
5715+
* Returns an Observable that emits the results of applying a function to the pair of values from the source
5716+
* Observable and an Iterable corresponding to that item that is generated by a selector, while limiting the
5717+
* number of concurrent subscriptions to these Observables.
5718+
* <p>
5719+
* <img width="640" height="390" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeMapIterable.r.png" alt="">
5720+
* <dl>
5721+
* <dt><b>Scheduler:</b></dt>
5722+
* <dd>{@code flatMapIterable} does not operate by default on a particular {@link Scheduler}.</dd>
5723+
* </dl>
5724+
*
5725+
* @param <U>
5726+
* the collection element type
5727+
* @param <R>
5728+
* the type of item emited by the resulting Observable
5729+
* @param collectionSelector
5730+
* a function that returns an Iterable sequence of values for each item emitted by the source
5731+
* Observable
5732+
* @param resultSelector
5733+
* a function that returns an item based on the item emitted by the source Observable and the
5734+
* Iterable returned for that item by the {@code collectionSelector}
5735+
* @param maxConcurrent
5736+
* the maximum number of Observables that may be subscribed to concurrently
5737+
* @return an Observable that emits the items returned by {@code resultSelector} for each item in the source
5738+
* Observable
5739+
* @throws IllegalArgumentException
5740+
* if {@code maxConcurrent} is less than or equal to 0
5741+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
5742+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5743+
*/
5744+
@Beta
5745+
public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Iterable<? extends U>> collectionSelector,
5746+
Func2<? super T, ? super U, ? extends R> resultSelector, int maxConcurrent) {
5747+
return flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector, maxConcurrent);
5748+
}
5749+
56845750
/**
56855751
* Subscribes to the {@link Observable} and receives notifications for each element.
56865752
* <p>

0 commit comments

Comments
 (0)