Skip to content

Commit fbefa23

Browse files
committed
1.x: concatMap full rewrite + delayError + performance
1 parent 662ce3b commit fbefa23

File tree

9 files changed

+769
-254
lines changed

9 files changed

+769
-254
lines changed

src/main/java/rx/Observable.java

Lines changed: 72 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -922,8 +922,9 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<?
922922
* {@code observables}, one after the other, without interleaving them
923923
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
924924
*/
925+
@SuppressWarnings({ "unchecked", "rawtypes" })
925926
public static <T> Observable<T> concat(Observable<? extends Observable<? extends T>> observables) {
926-
return observables.lift(OperatorConcat.<T>instance());
927+
return observables.concatMap((Func1)UtilityFunctions.identity());
927928
}
928929

929930
/**
@@ -1158,6 +1159,45 @@ public static <T> Observable<T> concat(Observable<? extends T> t1, Observable<?
11581159
return concat(just(t1, t2, t3, t4, t5, t6, t7, t8, t9));
11591160
}
11601161

1162+
/**
1163+
* Concatenates the Observable sequence of Observables into a single sequence by subscribing to each inner Observable,
1164+
* one after the other, one at a time and delays any errors till the all inner and the outer Observables terminate.
1165+
*
1166+
* <dl>
1167+
* <dt><b>Backpressure:</b></dt>
1168+
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
1169+
* <dt><b>Scheduler:</b></dt>
1170+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1171+
* </dl>
1172+
*
1173+
* @param sources the Observable sequence of Observables
1174+
* @return the new Observable with the concatenating behavior
1175+
*/
1176+
@SuppressWarnings({ "rawtypes", "unchecked" })
1177+
@Experimental
1178+
public static <T> Observable<T> concatDelayError(Observable<? extends Observable<? extends T>> sources) {
1179+
return sources.concatMapDelayError((Func1)UtilityFunctions.identity());
1180+
}
1181+
1182+
/**
1183+
* Concatenates the Iterable sequence of Observables into a single sequence by subscribing to each Observable,
1184+
* one after the other, one at a time and delays any errors till the all inner Observables terminate.
1185+
*
1186+
* <dl>
1187+
* <dt><b>Backpressure:</b></dt>
1188+
* <dd>{@code concatDelayError} fully supports backpressure.</dd>
1189+
* <dt><b>Scheduler:</b></dt>
1190+
* <dd>{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
1191+
* </dl>
1192+
*
1193+
* @param sources the Iterable sequence of Observables
1194+
* @return the new Observable with the concatenating behavior
1195+
*/
1196+
@Experimental
1197+
public static <T> Observable<T> concatDelayError(Iterable<? extends Observable<? extends T>> sources) {
1198+
return concatDelayError(from(sources));
1199+
}
1200+
11611201
/**
11621202
* Returns an Observable that calls an Observable factory to create an Observable for each new Observer
11631203
* that subscribes. That is, for each subscriber, the actual Observable that subscriber observes is
@@ -3957,7 +3997,37 @@ public final R call(R state, T value) {
39573997
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
39583998
*/
39593999
public final <R> Observable<R> concatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
3960-
return concat(map(func));
4000+
if (this instanceof ScalarSynchronousObservable) {
4001+
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4002+
return scalar.scalarFlatMap(func);
4003+
}
4004+
return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.IMMEDIATE));
4005+
}
4006+
4007+
/**
4008+
* Maps each of the items into an Observable, subscribes to them one after the other,
4009+
* one at a time and emits their values in order
4010+
* while delaying any error from either this or any of the inner Observables
4011+
* till all of them terminate.
4012+
*
4013+
* <dl>
4014+
* <dt><b>Backpressure:</b></dt>
4015+
* <dd>{@code concatMapDelayError} fully supports backpressure.</dd>
4016+
* <dt><b>Scheduler:</b></dt>
4017+
* <dd>{@code concatMapDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
4018+
* </dl>
4019+
*
4020+
* @param <R> the result value type
4021+
* @param func the function that maps the items of this Observable into the inner Observables.
4022+
* @return the new Observable instance with the concatenation behavior
4023+
*/
4024+
@Experimental
4025+
public final <R> Observable<R> concatMapDelayError(Func1<? super T, ? extends Observable<?extends R>> func) {
4026+
if (this instanceof ScalarSynchronousObservable) {
4027+
ScalarSynchronousObservable<T> scalar = (ScalarSynchronousObservable<T>) this;
4028+
return scalar.scalarFlatMap(func);
4029+
}
4030+
return create(new OnSubscribeConcatMap<T, R>(this, func, 2, OnSubscribeConcatMap.END));
39614031
}
39624032

39634033
/**

src/main/java/rx/exceptions/CompositeException.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,10 @@
1515
*/
1616
package rx.exceptions;
1717

18-
import java.io.PrintStream;
19-
import java.io.PrintWriter;
20-
import java.util.ArrayList;
21-
import java.util.Collection;
22-
import java.util.Collections;
23-
import java.util.HashSet;
24-
import java.util.LinkedHashSet;
25-
import java.util.List;
26-
import java.util.Set;
18+
import java.io.*;
19+
import java.util.*;
20+
21+
import rx.annotations.Experimental;
2722

2823
/**
2924
* Represents an exception that is a composite of one or more other exceptions. A {@code CompositeException}
@@ -73,6 +68,34 @@ public CompositeException(Collection<? extends Throwable> errors) {
7368
this(null, errors);
7469
}
7570

71+
/**
72+
* Constructs a CompositeException instance with the supplied initial Throwables.
73+
* @param errors the array of Throwables
74+
*/
75+
@Experimental
76+
public CompositeException(Throwable... errors) {
77+
Set<Throwable> deDupedExceptions = new LinkedHashSet<Throwable>();
78+
List<Throwable> _exceptions = new ArrayList<Throwable>();
79+
if (errors != null) {
80+
for (Throwable ex : errors) {
81+
if (ex instanceof CompositeException) {
82+
deDupedExceptions.addAll(((CompositeException) ex).getExceptions());
83+
} else
84+
if (ex != null) {
85+
deDupedExceptions.add(ex);
86+
} else {
87+
deDupedExceptions.add(new NullPointerException());
88+
}
89+
}
90+
} else {
91+
deDupedExceptions.add(new NullPointerException());
92+
}
93+
94+
_exceptions.addAll(deDupedExceptions);
95+
this.exceptions = Collections.unmodifiableList(_exceptions);
96+
this.message = exceptions.size() + " exceptions occurred. ";
97+
}
98+
7699
/**
77100
* Retrieves the list of exceptions that make up the {@code CompositeException}
78101
*

0 commit comments

Comments
 (0)