Skip to content

Commit b8bccd9

Browse files
Shyishakarnokd
authored andcommitted
Added zip function with Observable array. (#4036)
1 parent 6bef4ce commit b8bccd9

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3119,6 +3119,54 @@ public static <R> Observable<R> zip(Iterable<? extends Observable<?>> ws, FuncN<
31193119
return Observable.just(os.toArray(new Observable<?>[os.size()])).lift(new OperatorZip<R>(zipFunction));
31203120
}
31213121

3122+
/**
3123+
* Returns an Observable that emits the results of a specified combiner function applied to combinations of
3124+
* items emitted, in sequence, by an array of other Observables.
3125+
* <p>
3126+
* {@code zip} applies this function in strict sequence, so the first item emitted by the new Observable
3127+
* will be the result of the function applied to the first item emitted by each of the source Observables;
3128+
* the second item emitted by the new Observable will be the result of the function applied to the second
3129+
* item emitted by each of those Observables; and so forth.
3130+
* <p>
3131+
* The resulting {@code Observable<R>} returned from {@code zip} will invoke {@code onNext} as many times as
3132+
* the number of {@code onNext} invocations of the source Observable that emits the fewest items.
3133+
* <p>
3134+
* The operator subscribes to its sources in order they are specified and completes eagerly if
3135+
* one of the sources is shorter than the rest while unsubscribing the other sources. Therefore, it
3136+
* is possible those other sources will never be able to run to completion (and thus not calling
3137+
* {@code doOnCompleted()}). This can also happen if the sources are exactly the same length; if
3138+
* source A completes and B has been consumed and is about to complete, the operator detects A won't
3139+
* be sending further values and it will unsubscribe B immediately. For example:
3140+
* <pre><code>zip(new Observable[]{range(1, 5).doOnCompleted(action1), range(6, 5).doOnCompleted(action2)}, (a) -&gt;
3141+
* a)</code></pre>
3142+
* {@code action1} will be called but {@code action2} won't.
3143+
* <br>To work around this termination property,
3144+
* use {@code doOnUnsubscribed()} as well or use {@code using()} to do cleanup in case of completion
3145+
* or unsubscription.
3146+
* <p>
3147+
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
3148+
* <dl>
3149+
* <dt><b>Backpressure:</b><dt>
3150+
* <dd>The operator expects backpressure from the sources and honors backpressure from the downstream.
3151+
* (I.e., zipping with {@link #interval(long, TimeUnit)} may result in MissingBackpressureException, use
3152+
* one of the {@code onBackpressureX} to handle similar, backpressure-ignoring sources.</dd>
3153+
* <dt><b>Scheduler:</b></dt>
3154+
* <dd>{@code zip} does not operate by default on a particular {@link Scheduler}.</dd>
3155+
* </dl>
3156+
*
3157+
* @param ws
3158+
* an array of source Observables
3159+
* @param zipFunction
3160+
* a function that, when applied to an item emitted by each of the source Observables, results in
3161+
* an item that will be emitted by the resulting Observable
3162+
* @return an Observable that emits the zipped results
3163+
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
3164+
*/
3165+
@Experimental
3166+
public static <R> Observable<R> zip(Observable<?>[] ws, FuncN<? extends R> zipFunction) {
3167+
return Observable.just(ws).lift(new OperatorZip<R>(zipFunction));
3168+
}
3169+
31223170
/**
31233171
* Returns an Observable that emits the results of a specified combiner function applied to combinations of
31243172
* <i>n</i> items emitted, in sequence, by the <i>n</i> Observables emitted by a specified Observable.

src/test/java/rx/internal/operators/OperatorZipTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,68 @@ public Object call(final Object... args) {
998998
ts.assertReceivedOnNext(Collections.emptyList());
999999
}
10001000

1001+
@Test
1002+
public void testZipEmptyArray() {
1003+
Observable<Integer>[] ws = new Observable[0];
1004+
Observable<Integer> o = Observable.zip(ws, new FuncN<Integer>() {
1005+
@Override
1006+
public Integer call(Object... args) {
1007+
assertEquals("No argument should have been passed", 0, args.length);
1008+
return 0;
1009+
}
1010+
});
1011+
1012+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
1013+
o.subscribe(ts);
1014+
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
1015+
ts.assertReceivedOnNext(Collections.emptyList());
1016+
}
1017+
1018+
@Test
1019+
public void testZipArraySingleItem() {
1020+
final Integer expected = 0;
1021+
Observable<Integer>[] ws = new Observable[]{ Observable.just(expected) };
1022+
1023+
Observable<Integer> o = Observable.zip(ws, new FuncN<Integer>() {
1024+
@Override
1025+
public Integer call(Object... args) {
1026+
assertEquals("One argument should have been passed", 1, args.length);
1027+
return expected;
1028+
}
1029+
});
1030+
1031+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1032+
o.subscribe(ts);
1033+
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
1034+
ts.assertReceivedOnNext(Collections.singletonList(expected));
1035+
}
1036+
1037+
@Test
1038+
public void testZipBigArray() {
1039+
final int size = 20;
1040+
Integer expected = 0;
1041+
Observable<Integer>[] ws = new Observable[size];
1042+
1043+
for (int i = 0, wsLength = ws.length; i < wsLength; i++) {
1044+
ws[i] = Observable.just(i);
1045+
expected += i;
1046+
}
1047+
1048+
final Integer finalExpected = expected;
1049+
Observable<Integer> o = Observable.zip(ws, new FuncN<Integer>() {
1050+
@Override
1051+
public Integer call(Object... args) {
1052+
assertEquals(size + " arguments should have been passed", size, args.length);
1053+
return finalExpected;
1054+
}
1055+
});
1056+
1057+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
1058+
o.subscribe(ts);
1059+
ts.awaitTerminalEvent(200, TimeUnit.MILLISECONDS);
1060+
ts.assertReceivedOnNext(Collections.singletonList(expected));
1061+
}
1062+
10011063
/**
10021064
* Expect NoSuchElementException instead of blocking forever as zip should emit onCompleted and no onNext
10031065
* and last() expects at least a single response.

0 commit comments

Comments
 (0)