Skip to content

Commit 044620e

Browse files
committed
Merge pull request #3759 from akarnokd/OnSubscribeConcatMap1x
1.x: concatMap full rewrite + delayError + performance
2 parents a42d0bf + fbefa23 commit 044620e

File tree

9 files changed

+769
-254
lines changed

9 files changed

+769
-254
lines changed

src/main/java/rx/Observable.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -949,8 +949,9 @@ public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Ob
949949
* {@code observables}, one after the other, without interleaving them
950950
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
951951
*/
952+
@SuppressWarnings({ "unchecked", "rawtypes" })
952953
public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
953-
return observables.lift(OperatorConcat.<T>instance());
954+
return observables.concatMap((Func1)UtilityFunctions.identity());
954955
}
955956

956957
/**
@@ -1185,6 +1186,45 @@ public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<?
11851186
return concat(just(t1, t2, t3, t4, t5, t6, t7, t8, t9));
11861187
}
11871188

1189+
/**
1190+
* Concatenates the Observable sequence of Observables into a single sequence by subscribing to each inner Observable,
1191+
* one after the other, one at a time and delays any errors till the all inner and the outer Observables terminate.
1192+
*
1193+
* <dl>
1194+
* <dt><b>Backpressure:</b></dt>
1195+
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
1196+
* <dt><b>Scheduler:</b></dt>
1197+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1198+
* </dl>
1199+
*
1200+
* @param sources the Observable sequence of Observables
1201+
* @return the new Observable with the concatenating behavior
1202+
*/
1203+
@SuppressWarnings({ "rawtypes", "unchecked" })
1204+
@Experimental
1205+
public static <T> Observable<T> concatDelayError(Observable<? extends Observable<? extends T>> sources) {
1206+
return sources.concatMapDelayError((Func1)UtilityFunctions.identity());
1207+
}
1208+
1209+
/**
1210+
* Concatenates the Iterable sequence of Observables into a single sequence by subscribing to each Observable,
1211+
* one after the other, one at a time and delays any errors till the all inner Observables terminate.
1212+
*
1213+
* <dl>
1214+
* <dt><b>Backpressure:</b></dt>
1215+
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
1216+
* <dt><b>Scheduler:</b></dt>
1217+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1218+
* </dl>
1219+
*
1220+
* @param sources the Iterable sequence of Observables
1221+
* @return the new Observable with the concatenating behavior
1222+
*/
1223+
@Experimental
1224+
public static <T> Observable<T> concatDelayError(Iterable<? extends Observable<? extends T>> sources) {
1225+
return concatDelayError(from(sources));
1226+
}
1227+
11881228
/**
11891229
* Returns an Observable that calls an Observable factory to create an Observable for each new Observer
11901230
* that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is
@@ -3984,7 +4024,37 @@ public final R call(R state, T value) {
39844024
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
39854025
*/
39864026
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3987-
return concat(map(func));
4027+
if (this instanceof ScalarSynchronousObservable) {
4028+
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4029+
return scalar.scalarFlatMap(func);
4030+
}
4031+
return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
4032+
}
4033+
4034+
/**
4035+
* Maps each of the items into an Observable, subscribes to them one after the other,
4036+
* one at a time and emits their values in order
4037+
* while delaying any error from either this or any of the inner Observables
4038+
* till all of them terminate.
4039+
*
4040+
* <dl>
4041+
* <dt><b>Backpressure:</b></dt>
4042+
* <dd>{@code concatMapDelayError} fully supports backpressure.</dd>
4043+
* <dt><b>Scheduler:</b></dt>
4044+
* <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
4045+
* </dl>
4046+
*
4047+
* @param <R> the result value type
4048+
* @param func the function that maps the items of this Observable into the inner Observables.
4049+
* @return the new Observable instance with the concatenation behavior
4050+
*/
4051+
@Experimental
4052+
public final <R> Observable<R> concatMapDelayError(Func1<? super T, ? extends Observable<?extends R>> func) {
4053+
if (this instanceof ScalarSynchronousObservable) {
4054+
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4055+
return scalar.scalarFlatMap(func);
4056+
}
4057+
return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.END));
39884058
}
39894059

39904060
/**

src/main/java/rx/exceptions/CompositeException.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,10 @@
1515
*/
1616
package rx.exceptions;
1717

18-
import java.io.PrintStream;
19-
import java.io.PrintWriter;
20-
import java.util.ArrayList;
21-
import java.util.Collection;
22-
import java.util.Collections;
23-
import java.util.HashSet;
24-
import java.util.LinkedHashSet;
25-
import java.util.List;
26-
import java.util.Set;
18+
import java.io.*;
19+
import java.util.*;
20+
21+
import rx.annotations.Experimental;
2722

2823
/**
2924
* Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException}
@@ -73,6 +68,34 @@ public CompositeException(Collection<? extends Throwable> errors) {
7368
this(null, errors);
7469
}
7570

71+
/**
72+
* Constructs a CompositeException instance with the supplied initial Throwables.
73+
* @param errors the array of Throwables
74+
*/
75+
@Experimental
76+
public CompositeException(Throwable... errors) {
77+
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
78+
List<Throwable> _exceptions = new ArrayList<Throwable>();
79+
if (errors != null) {
80+
for (Throwable ex : errors) {
81+
if (ex instanceof CompositeException) {
82+
deDupedExceptions.addAll(((CompositeException) ex).getExceptions());
83+
} else
84+
if (ex != null) {
85+
deDupedExceptions.add(ex);
86+
} else {
87+
deDupedExceptions.add(new NullPointerException());
88+
}
89+
}
90+
} else {
91+
deDupedExceptions.add(new NullPointerException());
92+
}
93+
94+
_exceptions.addAll(deDupedExceptions);
95+
this.exceptions = Collections.unmodifiableList(_exceptions);
96+
this.message = exceptions.size() + " exceptions occurred. ";
97+
}
98+
7699
/**
77100
* Retrieves the list of exceptions that make up the {@code CompositeException}
78101
*

0 commit comments

Comments
 (0)