Skip to content

Commit 5468972

Browse files
authored
1.x: add Single.merge(Obs), Obs.flatMapSingle & flatMapCompletable (#5092)
1 parent 828db38 commit 5468972

File tree

7 files changed

+2115
-2
lines changed

7 files changed

+2115
-2
lines changed

src/main/java/rx/Observable.java

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6973,6 +6973,74 @@ public final <U, R> Observable<R> flatMap(final Func1<? super T, ? extends Obser
69736973
return merge(lift(new OperatorMapPair<T, U, R>(collectionSelector, resultSelector)), maxConcurrent);
69746974
}
69756975

6976+
/**
6977+
* Maps all upstream values to Completables and runs them together until the upstream
6978+
* and all inner Completables complete normally.
6979+
* <dl>
6980+
* <dt><b>Backpressure:</b></dt>
6981+
* <dd>The operator consumes items from upstream in an unbounded manner and ignores downstream backpressure
6982+
* as it doesn't emit items but only terminal event.</dd>
6983+
* <dt><b>Scheduler:</b></dt>
6984+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
6985+
* </dl>
6986+
* @param mapper the function that receives an upstream value and turns it into a Completable
6987+
* to be merged.
6988+
* @return the new Observable instance
6989+
* @see #flatMapCompletable(Func1, boolean, int)
6990+
* @since 1.2.7 - experimental
6991+
*/
6992+
@Experimental
6993+
public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper) {
6994+
return flatMapCompletable(mapper, false, Integer.MAX_VALUE);
6995+
}
6996+
6997+
/**
6998+
* Maps all upstream values to Completables and runs them together, optionally delaying any errors, until the upstream
6999+
* and all inner Completables terminate.
7000+
* <dl>
7001+
* <dt><b>Backpressure:</b></dt>
7002+
* <dd>The operator consumes items from upstream in an unbounded manner and ignores downstream backpressure
7003+
* as it doesn't emit items but only terminal event.</dd>
7004+
* <dt><b>Scheduler:</b></dt>
7005+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
7006+
* </dl>
7007+
* @param mapper the function that receives an upstream value and turns it into a Completable
7008+
* to be merged.
7009+
* @param delayErrors if true, errors from the upstream and from the inner Completables get delayed till
7010+
* the all of them terminate.
7011+
* @return the new Observable instance
7012+
* @since 1.2.7 - experimental
7013+
* @see #flatMapCompletable(Func1, boolean, int)
7014+
*/
7015+
@Experimental
7016+
public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper, boolean delayErrors) {
7017+
return flatMapCompletable(mapper, delayErrors, Integer.MAX_VALUE);
7018+
}
7019+
7020+
/**
7021+
* Maps upstream values to Completables and runs up to the given number of them together at a time,
7022+
* optionally delaying any errors, until the upstream and all inner Completables terminate.
7023+
* <dl>
7024+
* <dt><b>Backpressure:</b></dt>
7025+
* <dd>The operator consumes at most maxConcurrent items from upstream and one-by-one after as the inner
7026+
* Completables terminate. The operator ignores downstream backpressure as it doesn't emit items but
7027+
* only the terminal event.</dd>
7028+
* <dt><b>Scheduler:</b></dt>
7029+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
7030+
* </dl>
7031+
* @param mapper the function that receives an upstream value and turns it into a Completable
7032+
* to be merged.
7033+
* @param delayErrors if true, errors from the upstream and from the inner Completables get delayed till
7034+
* the all of them terminate.
7035+
* @param maxConcurrency the maximum number of inner Completables to run at a time
7036+
* @return the new Observable instance
7037+
* @since 1.2.7 - experimental
7038+
*/
7039+
@Experimental
7040+
public final Observable<T> flatMapCompletable(Func1<? super T, ? extends Completable> mapper, boolean delayErrors, int maxConcurrency) {
7041+
return unsafeCreate(new OnSubscribeFlatMapCompletable<T>(this, mapper, delayErrors, maxConcurrency));
7042+
}
7043+
69767044
/**
69777045
* Returns an Observable that merges each item emitted by the source Observable with the values in an
69787046
* Iterable corresponding to that item that is generated by a selector.
@@ -7106,6 +7174,74 @@ public final <U, R> Observable<R> flatMapIterable(Func1<? super T, ? extends Ite
71067174
return (Observable<R>)flatMap(OperatorMapPair.convertSelector(collectionSelector), resultSelector, maxConcurrent);
71077175
}
71087176

7177+
/**
7178+
* Maps all upstream values to Singles and runs them together until the upstream
7179+
* and all inner Singles complete normally.
7180+
* <dl>
7181+
* <dt><b>Backpressure:</b></dt>
7182+
* <dd>The operator consumes items from upstream in an unbounded manner and honors downstream backpressure.</dd>
7183+
* <dt><b>Scheduler:</b></dt>
7184+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7185+
* </dl>
7186+
* @param <R> the value type of the inner Singles and the resulting Observable
7187+
* @param mapper the function that receives an upstream value and turns it into a Single
7188+
* to be merged.
7189+
* @return the new Observable instance
7190+
* @see #flatMapSingle(Func1, boolean, int)
7191+
* @since 1.2.7 - experimental
7192+
*/
7193+
@Experimental
7194+
public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper) {
7195+
return flatMapSingle(mapper, false, Integer.MAX_VALUE);
7196+
}
7197+
7198+
/**
7199+
* Maps all upstream values to Singles and runs them together, optionally delaying any errors, until the upstream
7200+
* and all inner Singles terminate.
7201+
* <dl>
7202+
* <dt><b>Backpressure:</b></dt>
7203+
* <dd>The operator consumes items from upstream in an unbounded manner and honors downstream backpressure.</dd>
7204+
* <dt><b>Scheduler:</b></dt>
7205+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7206+
* </dl>
7207+
* @param <R> the value type of the inner Singles and the resulting Observable
7208+
* @param mapper the function that receives an upstream value and turns it into a Single
7209+
* to be merged.
7210+
* @param delayErrors if true, errors from the upstream and from the inner Singles get delayed till
7211+
* the all of them terminate.
7212+
* @return the new Observable instance
7213+
* @since 1.2.7 - experimental
7214+
* @see #flatMapSingle(Func1, boolean, int)
7215+
*/
7216+
@Experimental
7217+
public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper, boolean delayErrors) {
7218+
return flatMapSingle(mapper, delayErrors, Integer.MAX_VALUE);
7219+
}
7220+
7221+
/**
7222+
* Maps upstream values to Singles and runs up to the given number of them together at a time,
7223+
* optionally delaying any errors, until the upstream and all inner Singles terminate.
7224+
* <dl>
7225+
* <dt><b>Backpressure:</b></dt>
7226+
* <dd>The operator consumes at most maxConcurrent items from upstream and one-by-one after as the inner
7227+
* Singles terminate. The operator honors downstream backpressure.</dd>
7228+
* <dt><b>Scheduler:</b></dt>
7229+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7230+
* </dl>
7231+
* @param <R> the value type of the inner Singles and the resulting Observable
7232+
* @param mapper the function that receives an upstream value and turns it into a Single
7233+
* to be merged.
7234+
* @param delayErrors if true, errors from the upstream and from the inner Singles get delayed till
7235+
* the all of them terminate.
7236+
* @param maxConcurrency the maximum number of inner Singles to run at a time
7237+
* @return the new Observable instance
7238+
* @since 1.2.7 - experimental
7239+
*/
7240+
@Experimental
7241+
public final <R> Observable<R> flatMapSingle(Func1<? super T, ? extends Single<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
7242+
return unsafeCreate(new OnSubscribeFlatMapSingle<T, R>(this, mapper, delayErrors, maxConcurrency));
7243+
}
7244+
71097245
/**
71107246
* Subscribes to the {@link Observable} and receives notifications for each element.
71117247
* <p>

src/main/java/rx/Single.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -930,6 +930,97 @@ public static <T> Observable<T> merge(Single<? extends T> t1, Single<? extends T
930930
return Observable.merge(asObservable(t1), asObservable(t2), asObservable(t3), asObservable(t4), asObservable(t5), asObservable(t6), asObservable(t7), asObservable(t8), asObservable(t9));
931931
}
932932

933+
/**
934+
* Merges all Singles emitted by the Observable and runs them together until the source
935+
* Observable and all inner Singles complete normally.
936+
* <dl>
937+
* <dt><b>Backpressure:</b></dt>
938+
* <dd>The operator consumes items from the Observable in an unbounded manner and honors downstream backpressure.</dd>
939+
* <dt><b>Scheduler:</b></dt>
940+
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
941+
* </dl>
942+
* @param <T> the value type of the inner Singles and the resulting Observable
943+
* @param sources the Observable that emits Singles to be merged
944+
* @return the new Observable instance
945+
* @see #merge(Observable, int)
946+
* @see #mergeDelayError(Observable)
947+
* @see #mergeDelayError(Observable, int)
948+
* @since 1.2.7 - experimental
949+
*/
950+
@Experimental
951+
public static <T> Observable<T> merge(Observable<? extends Single<? extends T>> sources) {
952+
return merge(sources, Integer.MAX_VALUE);
953+
}
954+
955+
/**
956+
* Merges the Singles emitted by the Observable and runs up to the given number of them together at a time,
957+
* until the Observable and all inner Singles terminate.
958+
* <dl>
959+
* <dt><b>Backpressure:</b></dt>
960+
* <dd>The operator consumes at most maxConcurrent items from the Observable and one-by-one after as the inner
961+
* Singles terminate. The operator ignores downstream backpressure as it doesn't emit items but
962+
* only the terminal event.</dd>
963+
* <dt><b>Scheduler:</b></dt>
964+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
965+
* </dl>
966+
* @param <T> the value type of the inner Singles and the resulting Observable
967+
* @param sources the Observable that emits Singles to be merged
968+
* @param maxConcurrency the maximum number of inner Singles to run at a time
969+
* @return the new Observable instance
970+
* @since 1.2.7 - experimental
971+
*/
972+
@SuppressWarnings({ "unchecked", "rawtypes" })
973+
@Experimental
974+
public static <T> Observable<T> merge(Observable<? extends Single<? extends T>> sources, int maxConcurrency) {
975+
return sources.flatMapSingle((Func1)UtilityFunctions.identity(), false, maxConcurrency);
976+
}
977+
978+
/**
979+
* Merges all Singles emitted by the Observable and runs them together,
980+
* delaying errors from them and the Observable, until the source
981+
* Observable and all inner Singles complete normally.
982+
* <dl>
983+
* <dt><b>Backpressure:</b></dt>
984+
* <dd>The operator consumes items from the Observable in an unbounded manner and honors downstream backpressure.</dd>
985+
* <dt><b>Scheduler:</b></dt>
986+
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
987+
* </dl>
988+
* @param <T> the value type of the inner Singles and the resulting Observable
989+
* @param sources the Observable that emits Singles to be merged
990+
* @return the new Observable instance
991+
* @see #mergeDelayError(Observable, int)
992+
* @see #merge(Observable)
993+
* @see #merge(Observable, int)
994+
* @since 1.2.7 - experimental
995+
*/
996+
@Experimental
997+
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<? extends T>> sources) {
998+
return merge(sources, Integer.MAX_VALUE);
999+
}
1000+
1001+
/**
1002+
* Merges the Singles emitted by the Observable and runs up to the given number of them together at a time,
1003+
* delaying errors from them and the Observable, until the Observable and all inner Singles terminate.
1004+
* <dl>
1005+
* <dt><b>Backpressure:</b></dt>
1006+
* <dd>The operator consumes at most maxConcurrent items from the Observable and one-by-one after as the inner
1007+
* Singles terminate. The operator ignores downstream backpressure as it doesn't emit items but
1008+
* only the terminal event.</dd>
1009+
* <dt><b>Scheduler:</b></dt>
1010+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
1011+
* </dl>
1012+
* @param <T> the value type of the inner Singles and the resulting Observable
1013+
* @param sources the Observable that emits Singles to be merged
1014+
* @param maxConcurrency the maximum number of inner Singles to run at a time
1015+
* @return the new Observable instance
1016+
* @since 1.2.7 - experimental
1017+
*/
1018+
@SuppressWarnings({ "unchecked", "rawtypes" })
1019+
@Experimental
1020+
public static <T> Observable<T> mergeDelayError(Observable<? extends Single<? extends T>> sources, int maxConcurrency) {
1021+
return sources.flatMapSingle((Func1)UtilityFunctions.identity(), true, maxConcurrency);
1022+
}
1023+
9331024
/**
9341025
* Returns a Single that emits the results of a specified combiner function applied to two items emitted by
9351026
* two other Singles.

0 commit comments

Comments
 (0)