Skip to content

Commit d0b3bee

Browse files
PaulWoitaschekakarnokd
authored andcommitted
Added Observable.concat(Iterable) (#4330)
* Added Observable.concat(Iterable) * Adjusted testConcatWithIterableOfObservable to really test iterables. Before it was testing the same as testConcatWithObservableOfObservable
1 parent fabaa95 commit d0b3bee

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

src/main/java/rx/Observable.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,6 +1101,32 @@ public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Ob
11011101
return create(new OnSubscribeCombineLatest<T, R>(null, sources, combineFunction, RxRingBuffer.SIZE, true));
11021102
}
11031103

1104+
/**
1105+
* Flattens an Iterable of Observables into one Observable, one after the other, without
1106+
* interleaving them.
1107+
* <p>
1108+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concat.png" alt="">
1109+
* <dl>
1110+
* <dt><b>Backpressure:</b></dt>
1111+
* <dd>The operator honors backpressure from downstream. The {@code Observable}
1112+
* sources are expected to honor backpressure as well.
1113+
* If any of the source {@code Observable}s violate this, it <em>may</em> throw an
1114+
* {@code IllegalStateException} when the source {@code Observable} completes.</dd>
1115+
* <dt><b>Scheduler:</b></dt>
1116+
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
1117+
* </dl>
1118+
*
1119+
* @param <T> the common element base type
1120+
* @param sequences
1121+
* the Iterable of Observables
1122+
* @return an Observable that emits items that are the result of flattening the items emitted by the
1123+
* Observables in the Iterable, one after the other, without interleaving them
1124+
* @see <a href="http://reactivex.io/documentation/operators/concat.html">ReactiveX operators documentation: Concat</a>
1125+
*/
1126+
public static <T> Observable<T> concat(Iterable<? extends Observable<? extends T>> sequences) {
1127+
return concat(from(sequences));
1128+
}
1129+
11041130
/**
11051131
* Returns an Observable that emits the items emitted by each of the Observables emitted by the source
11061132
* Observable, one after the other, without interleaving them.

src/test/java/rx/ConcatTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void testConcatWithIterableOfObservable() {
6969
@SuppressWarnings("unchecked")
7070
Iterable<Observable<String>> is = Arrays.asList(o1, o2, o3);
7171

72-
List<String> values = Observable.concat(Observable.from(is)).toList().toBlocking().single();
72+
List<String> values = Observable.concat(is).toList().toBlocking().single();
7373

7474
assertEquals("one", values.get(0));
7575
assertEquals("two", values.get(1));

0 commit comments

Comments
 (0)