Skip to content

Commit 3a6ce5a

Browse files
committed
Merge pull request #2769 from davidmoten/combineLatest-request-overflow
OperatorCombineLatest request overflow check
2 parents 14ed8d4 + 2043094 commit 3a6ce5a

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ public MultiSourceProducer(final Subscriber<? super R> child, final List<? exten
110110

111111
@Override
112112
public void request(long n) {
113-
requested.getAndAdd(n);
113+
BackpressureUtils.getAndAddRequest(requested, n);
114114
if (!started.get() && started.compareAndSet(false, true)) {
115115
/*
116116
* NOTE: this logic will ONLY work if we don't have more sources than the size of the buffer.

src/test/java/rx/internal/operators/OnSubscribeCombineLatestTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,5 +851,40 @@ public Long call(Long t1, Integer t2) {
851851

852852
assertEquals(SIZE, count.get());
853853
}
854+
855+
@Test(timeout=10000)
856+
public void testCombineLatestRequestOverflow() throws InterruptedException {
857+
List<Observable<Integer>> sources = Arrays.asList(Observable.from(Arrays.asList(1,2,3,4)), Observable.from(Arrays.asList(5,6,7,8)));
858+
Observable<Integer> o = Observable.combineLatest(sources,new FuncN<Integer>() {
859+
@Override
860+
public Integer call(Object... args) {
861+
return (Integer) args[0];
862+
}});
863+
//should get at least 4
864+
final CountDownLatch latch = new CountDownLatch(4);
865+
o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {
866+
867+
@Override
868+
public void onStart() {
869+
request(2);
870+
}
871+
872+
@Override
873+
public void onCompleted() {
874+
//ignore
875+
}
876+
877+
@Override
878+
public void onError(Throwable e) {
879+
throw new RuntimeException(e);
880+
}
881+
882+
@Override
883+
public void onNext(Integer t) {
884+
latch.countDown();
885+
request(Long.MAX_VALUE-1);
886+
}});
887+
assertTrue(latch.await(10, TimeUnit.SECONDS));
888+
}
854889

855890
}

0 commit comments

Comments
 (0)