Skip to content

Commit 4b80956

Browse files
committed
1.x: combineLatestDelayError
1 parent 662ce3b commit 4b80956

File tree

3 files changed

+132
-2
lines changed

3 files changed

+132
-2
lines changed

src/main/java/rx/Observable.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,33 @@ public static <T, R> Observable<R> combineLatest(Iterable<? extends Observable<?
906906
return create(new OnSubscribeCombineLatest<T, R>(sources, combineFunction));
907907
}
908908

909+
/**
910+
* Combines a collection of source Observables by emitting an item that aggregates the latest values of each of
911+
* the source Observables each time an item is received from any of the source Observables, where this
912+
* aggregation is defined by a specified function and delays any error from the sources until
913+
* all source Observables terminate.
914+
*
915+
* <dl>
916+
* <dt><b>Scheduler:</b></dt>
917+
* <dd>{@code combineLatest} does not operate by default on a particular {@link Scheduler}.</dd>
918+
* </dl>
919+
*
920+
* @param <T>
921+
* the common base type of source values
922+
* @param <R>
923+
* the result type
924+
* @param sources
925+
* the collection of source Observables
926+
* @param combineFunction
927+
* the aggregation function used to combine the items emitted by the source Observables
928+
* @return an Observable that emits items that are the result of combining the items emitted by the source
929+
* Observables by means of the given aggregation function
930+
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
931+
*/
932+
public static <T, R> Observable<R> combineLatestDelayError(Iterable<? extends Observable<? extends T>> sources, FuncN<? extends R> combineFunction) {
933+
return create(new OnSubscribeCombineLatest<T, R>(null, sources, combineFunction, RxRingBuffer.SIZE, true));
934+
}
935+
909936
/**
910937
* Returns an Observable that emits the items emitted by each of the Observables emitted by the source
911938
* Observable, one after the other, without interleaving them.

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ void combine(Object value, int index) {
213213
if (value != null && allSourcesFinished) {
214214
queue.offer(combinerSubscriber, latest.clone());
215215
} else
216-
if (value == null && error.get() != null) {
216+
if (value == null && error.get() != null && (o == MISSING || !delayError)) {
217217
done = true; // if this source completed without a value
218218
}
219219
} else {

src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import rx.*;
3030
import rx.Observable;
3131
import rx.Observer;
32+
import rx.exceptions.*;
3233
import rx.functions.*;
3334
import rx.internal.util.RxRingBuffer;
3435
import rx.observers.TestSubscriber;
@@ -954,5 +955,107 @@ public Integer call(Object... args) {
954955
throw new RuntimeException();
955956
}
956957

957-
};
958+
};
959+
960+
@SuppressWarnings("unchecked")
961+
@Test
962+
public void firstJustError() {
963+
TestSubscriber<Integer> ts = TestSubscriber.create();
964+
965+
Observable.combineLatestDelayError(
966+
Arrays.asList(Observable.just(1), Observable.<Integer>error(new TestException())),
967+
new FuncN<Integer>() {
968+
@Override
969+
public Integer call(Object... args) {
970+
return ((Integer)args[0]) + ((Integer)args[1]);
971+
}
972+
}
973+
).subscribe(ts);
974+
975+
ts.assertNoValues();
976+
ts.assertError(TestException.class);
977+
ts.assertNotCompleted();
978+
}
979+
980+
@SuppressWarnings("unchecked")
981+
@Test
982+
public void secondJustError() {
983+
TestSubscriber<Integer> ts = TestSubscriber.create();
984+
985+
Observable.combineLatestDelayError(
986+
Arrays.asList(Observable.<Integer>error(new TestException()), Observable.just(1)),
987+
new FuncN<Integer>() {
988+
@Override
989+
public Integer call(Object... args) {
990+
return ((Integer)args[0]) + ((Integer)args[1]);
991+
}
992+
}
993+
).subscribe(ts);
994+
995+
ts.assertNoValues();
996+
ts.assertError(TestException.class);
997+
ts.assertNotCompleted();
998+
}
999+
1000+
@SuppressWarnings("unchecked")
1001+
@Test
1002+
public void oneErrors() {
1003+
TestSubscriber<Integer> ts = TestSubscriber.create();
1004+
1005+
Observable.combineLatestDelayError(
1006+
Arrays.asList(Observable.just(10).concatWith(Observable.<Integer>error(new TestException())), Observable.just(1)),
1007+
new FuncN<Integer>() {
1008+
@Override
1009+
public Integer call(Object... args) {
1010+
return ((Integer)args[0]) + ((Integer)args[1]);
1011+
}
1012+
}
1013+
).subscribe(ts);
1014+
1015+
ts.assertValues(11);
1016+
ts.assertError(TestException.class);
1017+
ts.assertNotCompleted();
1018+
}
1019+
1020+
@SuppressWarnings("unchecked")
1021+
@Test
1022+
public void twoErrors() {
1023+
TestSubscriber<Integer> ts = TestSubscriber.create();
1024+
1025+
Observable.combineLatestDelayError(
1026+
Arrays.asList(Observable.just(1), Observable.just(10).concatWith(Observable.<Integer>error(new TestException()))),
1027+
new FuncN<Integer>() {
1028+
@Override
1029+
public Integer call(Object... args) {
1030+
return ((Integer)args[0]) + ((Integer)args[1]);
1031+
}
1032+
}
1033+
).subscribe(ts);
1034+
1035+
ts.assertValues(11);
1036+
ts.assertError(TestException.class);
1037+
ts.assertNotCompleted();
1038+
}
1039+
1040+
@SuppressWarnings("unchecked")
1041+
@Test
1042+
public void bothError() {
1043+
TestSubscriber<Integer> ts = TestSubscriber.create();
1044+
1045+
Observable.combineLatestDelayError(
1046+
Arrays.asList(Observable.just(1).concatWith(Observable.<Integer>error(new TestException())),
1047+
Observable.just(10).concatWith(Observable.<Integer>error(new TestException()))),
1048+
new FuncN<Integer>() {
1049+
@Override
1050+
public Integer call(Object... args) {
1051+
return ((Integer)args[0]) + ((Integer)args[1]);
1052+
}
1053+
}
1054+
).subscribe(ts);
1055+
1056+
ts.assertValues(11);
1057+
ts.assertError(CompositeException.class);
1058+
ts.assertNotCompleted();
1059+
}
1060+
9581061
}

0 commit comments

Comments
 (0)