Skip to content

Commit f00533c

Browse files
authored
2.x: Add missing {Maybe|Single}.mergeDelayError variants (#5799)
* 2.x: Add missing {Maybe|Single}.mergeDelayError variants * In javadoc, use "a SingleSource to be merged"
1 parent 2adb1fa commit f00533c

File tree

4 files changed

+399
-16
lines changed

4 files changed

+399
-16
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,12 +1180,12 @@ public static <T> Flowable<T> mergeDelayError(Iterable<? extends MaybeSource<? e
11801180

11811181
/**
11821182
* Flattens a Publisher that emits MaybeSources into one Publisher, in a way that allows a Subscriber to
1183-
* receive all successfully emitted items from all of the source Publishers without being interrupted by
1184-
* an error notification from one of them.
1183+
* receive all successfully emitted items from all of the source MaybeSources without being interrupted by
1184+
* an error notification from one of them or even the main Publisher.
11851185
* <p>
1186-
* This behaves like {@link #merge(Publisher)} except that if any of the merged Publishers notify of an
1186+
* This behaves like {@link #merge(Publisher)} except that if any of the merged MaybeSources notify of an
11871187
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
1188-
* error notification until all of the merged Publishers have finished emitting items.
1188+
* error notification until all of the merged MaybeSources and the main Publisher have finished emitting items.
11891189
* <p>
11901190
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
11911191
* <p>
@@ -1214,6 +1214,46 @@ public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<?
12141214
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true);
12151215
}
12161216

1217+
1218+
/**
1219+
* Flattens a Publisher that emits MaybeSources into one Publisher, in a way that allows a Subscriber to
1220+
* receive all successfully emitted items from all of the source MaybeSources without being interrupted by
1221+
* an error notification from one of them or even the main Publisher as well as limiting the total number of active MaybeSources.
1222+
* <p>
1223+
* This behaves like {@link #merge(Publisher, int)} except that if any of the merged MaybeSources notify of an
1224+
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
1225+
* error notification until all of the merged MaybeSources and the main Publisher have finished emitting items.
1226+
* <p>
1227+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/mergeDelayError.png" alt="">
1228+
* <p>
1229+
* Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only
1230+
* invoke the {@code onError} method of its Subscribers once.
1231+
* <dl>
1232+
* <dt><b>Backpressure:</b></dt>
1233+
* <dd>The operator honors backpressure from downstream. The outer {@code Publisher} is consumed
1234+
* in unbounded mode (i.e., no backpressure is applied to it).</dd>
1235+
* <dt><b>Scheduler:</b></dt>
1236+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1237+
* </dl>
1238+
*
1239+
* @param <T> the common element base type
1240+
* @param sources
1241+
* a Publisher that emits MaybeSources
1242+
* @param maxConcurrency the maximum number of active inner MaybeSources to be merged at a time
1243+
* @return a Flowable that emits all of the items emitted by the Publishers emitted by the
1244+
* {@code source} Publisher
1245+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1246+
* @since 2.1.9 - experimental
1247+
*/
1248+
@SuppressWarnings({ "unchecked", "rawtypes" })
1249+
@BackpressureSupport(BackpressureKind.FULL)
1250+
@CheckReturnValue
1251+
@SchedulerSupport(SchedulerSupport.NONE)
1252+
@Experimental
1253+
public static <T> Flowable<T> mergeDelayError(Publisher<? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
1254+
return Flowable.fromPublisher(sources).flatMap((Function)MaybeToPublisher.instance(), true, maxConcurrency);
1255+
}
1256+
12171257
/**
12181258
* Flattens two MaybeSources into one Flowable, in a way that allows a Subscriber to receive all
12191259
* successfully emitted items from each of the source MaybeSources without being interrupted by an error

src/main/java/io/reactivex/Single.java

Lines changed: 186 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -787,9 +787,9 @@ public static <T> Single<T> merge(SingleSource<? extends SingleSource<? extends
787787
*
788788
* @param <T> the common value type
789789
* @param source1
790-
* a Single to be merged
790+
* a SingleSource to be merged
791791
* @param source2
792-
* a Single to be merged
792+
* a SingleSource to be merged
793793
* @return a Flowable that emits all of the items emitted by the source Singles
794794
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
795795
* @see #mergeDelayError(SingleSource, SingleSource)
@@ -835,11 +835,11 @@ public static <T> Flowable<T> merge(
835835
*
836836
* @param <T> the common value type
837837
* @param source1
838-
* a Single to be merged
838+
* a SingleSource to be merged
839839
* @param source2
840-
* a Single to be merged
840+
* a SingleSource to be merged
841841
* @param source3
842-
* a Single to be merged
842+
* a SingleSource to be merged
843843
* @return a Flowable that emits all of the items emitted by the source Singles
844844
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
845845
* @see #mergeDelayError(SingleSource, SingleSource, SingleSource)
@@ -880,20 +880,20 @@ public static <T> Flowable<T> merge(
880880
* {@link RxJavaPlugins#onError(Throwable)} method as {@code UndeliverableException} errors. Similarly, {@code Throwable}s
881881
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
882882
* (composite) error will be sent to the same global error handler.
883-
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
883+
* Use {@link #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)} to merge sources and terminate only when all source {@code SingleSource}s
884884
* have completed or failed with an error.
885885
* </dd>
886886
* </dl>
887887
*
888888
* @param <T> the common value type
889889
* @param source1
890-
* a Single to be merged
890+
* a SingleSource to be merged
891891
* @param source2
892-
* a Single to be merged
892+
* a SingleSource to be merged
893893
* @param source3
894-
* a Single to be merged
894+
* a SingleSource to be merged
895895
* @param source4
896-
* a Single to be merged
896+
* a SingleSource to be merged
897897
* @return a Flowable that emits all of the items emitted by the source Singles
898898
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
899899
* @see #mergeDelayError(SingleSource, SingleSource, SingleSource, SingleSource)
@@ -913,6 +913,181 @@ public static <T> Flowable<T> merge(
913913
return merge(Flowable.fromArray(source1, source2, source3, source4));
914914
}
915915

916+
917+
/**
918+
* Merges an Iterable sequence of SingleSource instances into a single Flowable sequence,
919+
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
920+
* <dl>
921+
* <dt><b>Backpressure:</b></dt>
922+
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
923+
* <dt><b>Scheduler:</b></dt>
924+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
925+
* </dl>
926+
* @param <T> the common and resulting value type
927+
* @param sources the Iterable sequence of SingleSource sources
928+
* @return the new Flowable instance
929+
* @since 2.1.9 - experimental
930+
* @see #merge(Iterable)
931+
*/
932+
@CheckReturnValue
933+
@BackpressureSupport(BackpressureKind.FULL)
934+
@SchedulerSupport(SchedulerSupport.NONE)
935+
@Experimental
936+
public static <T> Flowable<T> mergeDelayError(Iterable<? extends SingleSource<? extends T>> sources) {
937+
return mergeDelayError(Flowable.fromIterable(sources));
938+
}
939+
940+
/**
941+
* Merges a Flowable sequence of SingleSource instances into a single Flowable sequence,
942+
* running all SingleSources at once and delaying any error(s) until all sources succeed or fail.
943+
* <dl>
944+
* <dt><b>Backpressure:</b></dt>
945+
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
946+
* <dt><b>Scheduler:</b></dt>
947+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
948+
* </dl>
949+
* @param <T> the common and resulting value type
950+
* @param sources the Flowable sequence of SingleSource sources
951+
* @return the new Flowable instance
952+
* @see #merge(Publisher)
953+
* @since 2.1.9 - experimental
954+
*/
955+
@CheckReturnValue
956+
@BackpressureSupport(BackpressureKind.FULL)
957+
@SchedulerSupport(SchedulerSupport.NONE)
958+
@SuppressWarnings({ "unchecked", "rawtypes" })
959+
@Experimental
960+
public static <T> Flowable<T> mergeDelayError(Publisher<? extends SingleSource<? extends T>> sources) {
961+
ObjectHelper.requireNonNull(sources, "sources is null");
962+
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize()));
963+
}
964+
965+
966+
/**
967+
* Flattens two Singles into a single Flowable, without any transformation, delaying
968+
* any error(s) until all sources succeed or fail.
969+
* <p>
970+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
971+
* <p>
972+
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by
973+
* using the {@code mergeDelayError} method.
974+
* <dl>
975+
* <dt><b>Backpressure:</b></dt>
976+
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
977+
* <dt><b>Scheduler:</b></dt>
978+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
979+
* </dl>
980+
*
981+
* @param <T> the common value type
982+
* @param source1
983+
* a SingleSource to be merged
984+
* @param source2
985+
* a SingleSource to be merged
986+
* @return a Flowable that emits all of the items emitted by the source Singles
987+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
988+
* @see #merge(SingleSource, SingleSource)
989+
* @since 2.1.9 - experimental
990+
*/
991+
@CheckReturnValue
992+
@BackpressureSupport(BackpressureKind.FULL)
993+
@SchedulerSupport(SchedulerSupport.NONE)
994+
@SuppressWarnings("unchecked")
995+
@Experimental
996+
public static <T> Flowable<T> mergeDelayError(
997+
SingleSource<? extends T> source1, SingleSource<? extends T> source2
998+
) {
999+
ObjectHelper.requireNonNull(source1, "source1 is null");
1000+
ObjectHelper.requireNonNull(source2, "source2 is null");
1001+
return mergeDelayError(Flowable.fromArray(source1, source2));
1002+
}
1003+
1004+
/**
1005+
* Flattens three Singles into a single Flowable, without any transformation, delaying
1006+
* any error(s) until all sources succeed or fail.
1007+
* <p>
1008+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
1009+
* <p>
1010+
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
1011+
* the {@code mergeDelayError} method.
1012+
* <dl>
1013+
* <dt><b>Backpressure:</b></dt>
1014+
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
1015+
* <dt><b>Scheduler:</b></dt>
1016+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1017+
* </dl>
1018+
*
1019+
* @param <T> the common value type
1020+
* @param source1
1021+
* a SingleSource to be merged
1022+
* @param source2
1023+
* a SingleSource to be merged
1024+
* @param source3
1025+
* a SingleSource to be merged
1026+
* @return a Flowable that emits all of the items emitted by the source Singles
1027+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1028+
* @see #merge(SingleSource, SingleSource, SingleSource)
1029+
* @since 2.1.9 - experimental
1030+
*/
1031+
@CheckReturnValue
1032+
@BackpressureSupport(BackpressureKind.FULL)
1033+
@SchedulerSupport(SchedulerSupport.NONE)
1034+
@SuppressWarnings("unchecked")
1035+
@Experimental
1036+
public static <T> Flowable<T> mergeDelayError(
1037+
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
1038+
SingleSource<? extends T> source3
1039+
) {
1040+
ObjectHelper.requireNonNull(source1, "source1 is null");
1041+
ObjectHelper.requireNonNull(source2, "source2 is null");
1042+
ObjectHelper.requireNonNull(source3, "source3 is null");
1043+
return mergeDelayError(Flowable.fromArray(source1, source2, source3));
1044+
}
1045+
1046+
/**
1047+
* Flattens four Singles into a single Flowable, without any transformation, delaying
1048+
* any error(s) until all sources succeed or fail.
1049+
* <p>
1050+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.merge.png" alt="">
1051+
* <p>
1052+
* You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using
1053+
* the {@code mergeDelayError} method.
1054+
* <dl>
1055+
* <dt><b>Backpressure:</b></dt>
1056+
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
1057+
* <dt><b>Scheduler:</b></dt>
1058+
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1059+
* </dl>
1060+
*
1061+
* @param <T> the common value type
1062+
* @param source1
1063+
* a SingleSource to be merged
1064+
* @param source2
1065+
* a SingleSource to be merged
1066+
* @param source3
1067+
* a SingleSource to be merged
1068+
* @param source4
1069+
* a SingleSource to be merged
1070+
* @return a Flowable that emits all of the items emitted by the source Singles
1071+
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
1072+
* @see #merge(SingleSource, SingleSource, SingleSource, SingleSource)
1073+
* @since 2.1.9 - experimental
1074+
*/
1075+
@CheckReturnValue
1076+
@BackpressureSupport(BackpressureKind.FULL)
1077+
@SchedulerSupport(SchedulerSupport.NONE)
1078+
@SuppressWarnings("unchecked")
1079+
@Experimental
1080+
public static <T> Flowable<T> mergeDelayError(
1081+
SingleSource<? extends T> source1, SingleSource<? extends T> source2,
1082+
SingleSource<? extends T> source3, SingleSource<? extends T> source4
1083+
) {
1084+
ObjectHelper.requireNonNull(source1, "source1 is null");
1085+
ObjectHelper.requireNonNull(source2, "source2 is null");
1086+
ObjectHelper.requireNonNull(source3, "source3 is null");
1087+
ObjectHelper.requireNonNull(source4, "source4 is null");
1088+
return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4));
1089+
}
1090+
9161091
/**
9171092
* Returns a singleton instance of a never-signalling Single (only calls onSubscribe).
9181093
* <dl>
@@ -2417,7 +2592,7 @@ public final Single<Boolean> contains(final Object value, final BiPredicate<Obje
24172592
* </dl>
24182593
*
24192594
* @param other
2420-
* a Single to be merged
2595+
* a SingleSource to be merged
24212596
* @return that emits all of the items emitted by the source Singles
24222597
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
24232598
*/

0 commit comments

Comments
 (0)