Skip to content

Commit 19a66b7

Browse files
committed
Merge pull request #2543 from davidmoten/merge-request-overflow
OperatorMerge handle request overflow
2 parents 18def88 + 5348ebf commit 19a66b7

File tree

2 files changed

+42
-1
lines changed

2 files changed

+42
-1
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -545,7 +545,16 @@ public void request(long n) {
545545
if (n == Long.MAX_VALUE) {
546546
requested = Long.MAX_VALUE;
547547
} else {
548-
REQUESTED.getAndAdd(this, n);
548+
// add n to requested but check for overflow
549+
while (true) {
550+
long current = REQUESTED.get(this);
551+
long next = current + n;
552+
//check for overflow
553+
if (next < 0)
554+
next = Long.MAX_VALUE;
555+
if (REQUESTED.compareAndSet(this, current, next))
556+
break;
557+
}
549558
if (ms.drainQueuesIfNeeded()) {
550559
boolean sendComplete = false;
551560
synchronized (ms) {

src/test/java/rx/internal/operators/OperatorMergeTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1183,6 +1183,38 @@ public void call() {
11831183
assertTrue(a);
11841184
//}
11851185
}
1186+
1187+
@Test
1188+
public void testMergeRequestOverflow() throws InterruptedException {
1189+
//do a non-trivial merge so that future optimisations with EMPTY don't invalidate this test
1190+
Observable<Integer> o = Observable.from(Arrays.asList(1,2)).mergeWith(Observable.from(Arrays.asList(3,4)));
1191+
final int expectedCount = 4;
1192+
final CountDownLatch latch = new CountDownLatch(expectedCount);
1193+
o.subscribeOn(Schedulers.computation()).subscribe(new Subscriber<Integer>() {
1194+
1195+
@Override
1196+
public void onStart() {
1197+
request(1);
1198+
}
1199+
1200+
@Override
1201+
public void onCompleted() {
1202+
//ignore
1203+
}
1204+
1205+
@Override
1206+
public void onError(Throwable e) {
1207+
throw new RuntimeException(e);
1208+
}
1209+
1210+
@Override
1211+
public void onNext(Integer t) {
1212+
latch.countDown();
1213+
request(2);
1214+
request(Long.MAX_VALUE-1);
1215+
}});
1216+
assertTrue(latch.await(10, TimeUnit.SECONDS));
1217+
}
11861218

11871219
private static Action1<Integer> printCount() {
11881220
return new Action1<Integer>() {

0 commit comments

Comments
 (0)