Skip to content

Commit f6ca9ee

Browse files
committed
prevent request overflow in OperatorObserveOn and add unit test that fails on original codebase but passes with change
1 parent 51f497d commit f6ca9ee

File tree

2 files changed

+54
-6
lines changed

2 files changed

+54
-6
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,22 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Queue;
19-
import java.util.concurrent.atomic.*;
19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2021

2122
import rx.Observable.Operator;
22-
import rx.*;
23+
import rx.Producer;
24+
import rx.Scheduler;
25+
import rx.Subscriber;
26+
import rx.Subscription;
2327
import rx.exceptions.MissingBackpressureException;
2428
import rx.functions.Action0;
25-
import rx.internal.util.*;
26-
import rx.internal.util.unsafe.*;
27-
import rx.schedulers.*;
29+
import rx.internal.util.RxRingBuffer;
30+
import rx.internal.util.SynchronizedQueue;
31+
import rx.internal.util.unsafe.SpscArrayQueue;
32+
import rx.internal.util.unsafe.UnsafeAccess;
33+
import rx.schedulers.ImmediateScheduler;
34+
import rx.schedulers.TrampolineScheduler;
2835

2936
/**
3037
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
@@ -96,7 +103,7 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
96103

97104
@Override
98105
public void request(long n) {
99-
REQUESTED.getAndAdd(ObserveOnSubscriber.this, n);
106+
BackpressureUtils.getAndAddRequest(REQUESTED, ObserveOnSubscriber.this, n);
100107
schedule();
101108
}
102109

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,4 +724,45 @@ public Long call(Long t1, Integer t2) {
724724
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
725725
}
726726

727+
@Test
728+
public void testRequestOverflow() throws InterruptedException {
729+
730+
final CountDownLatch latch = new CountDownLatch(1);
731+
final AtomicInteger count = new AtomicInteger();
732+
Observable.range(1, 100).observeOn(Schedulers.computation())
733+
.subscribe(new Subscriber<Integer>() {
734+
735+
boolean first = true;
736+
737+
@Override
738+
public void onStart() {
739+
request(2);
740+
}
741+
742+
@Override
743+
public void onCompleted() {
744+
latch.countDown();
745+
}
746+
747+
@Override
748+
public void onError(Throwable e) {
749+
750+
}
751+
752+
@Override
753+
public void onNext(Integer t) {
754+
count.incrementAndGet();
755+
if (first) {
756+
request(Long.MAX_VALUE - 1);
757+
request(Long.MAX_VALUE - 1);
758+
request(10);
759+
first = false;
760+
}
761+
}
762+
});
763+
assertTrue(latch.await(10, TimeUnit.SECONDS));
764+
assertEquals(100, count.get());
765+
766+
}
767+
727768
}

0 commit comments

Comments
 (0)