Skip to content

Commit 24dadf1

Browse files
committed
Merge pull request #2921 from davidmoten/observe-on-request-overflow
OperatorObserveOn - handle request overflow correctly
2 parents aeee037 + fcfa4e8 commit 24dadf1

File tree

2 files changed

+62
-7
lines changed

2 files changed

+62
-7
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,22 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Queue;
19-
import java.util.concurrent.atomic.*;
19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2021

2122
import rx.Observable.Operator;
22-
import rx.*;
23+
import rx.Producer;
24+
import rx.Scheduler;
25+
import rx.Subscriber;
26+
import rx.Subscription;
2327
import rx.exceptions.MissingBackpressureException;
2428
import rx.functions.Action0;
25-
import rx.internal.util.*;
26-
import rx.internal.util.unsafe.*;
27-
import rx.schedulers.*;
29+
import rx.internal.util.RxRingBuffer;
30+
import rx.internal.util.SynchronizedQueue;
31+
import rx.internal.util.unsafe.SpscArrayQueue;
32+
import rx.internal.util.unsafe.UnsafeAccess;
33+
import rx.schedulers.ImmediateScheduler;
34+
import rx.schedulers.TrampolineScheduler;
2835

2936
/**
3037
* Delivers events on the specified {@code Scheduler} asynchronously via an unbounded buffer.
@@ -54,7 +61,9 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
5461
// avoid overhead, execute directly
5562
return child;
5663
} else {
57-
return new ObserveOnSubscriber<T>(scheduler, child);
64+
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child);
65+
parent.init();
66+
return parent;
5867
}
5968
}
6069

@@ -91,12 +100,17 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child) {
91100
queue = new SynchronizedQueue<Object>(RxRingBuffer.SIZE);
92101
}
93102
this.scheduledUnsubscribe = new ScheduledUnsubscribe(recursiveScheduler);
103+
}
104+
105+
void init() {
106+
// don't want this code in the constructor because `this` can escape through the
107+
// setProducer call
94108
child.add(scheduledUnsubscribe);
95109
child.setProducer(new Producer() {
96110

97111
@Override
98112
public void request(long n) {
99-
REQUESTED.getAndAdd(ObserveOnSubscriber.this, n);
113+
BackpressureUtils.getAndAddRequest(REQUESTED, ObserveOnSubscriber.this, n);
100114
schedule();
101115
}
102116

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,4 +724,45 @@ public Long call(Long t1, Integer t2) {
724724
assertEquals(MissingBackpressureException.class, ts.getOnErrorEvents().get(0).getClass());
725725
}
726726

727+
@Test
728+
public void testRequestOverflow() throws InterruptedException {
729+
730+
final CountDownLatch latch = new CountDownLatch(1);
731+
final AtomicInteger count = new AtomicInteger();
732+
Observable.range(1, 100).observeOn(Schedulers.computation())
733+
.subscribe(new Subscriber<Integer>() {
734+
735+
boolean first = true;
736+
737+
@Override
738+
public void onStart() {
739+
request(2);
740+
}
741+
742+
@Override
743+
public void onCompleted() {
744+
latch.countDown();
745+
}
746+
747+
@Override
748+
public void onError(Throwable e) {
749+
750+
}
751+
752+
@Override
753+
public void onNext(Integer t) {
754+
count.incrementAndGet();
755+
if (first) {
756+
request(Long.MAX_VALUE - 1);
757+
request(Long.MAX_VALUE - 1);
758+
request(10);
759+
first = false;
760+
}
761+
}
762+
});
763+
assertTrue(latch.await(10, TimeUnit.SECONDS));
764+
assertEquals(100, count.get());
765+
766+
}
767+
727768
}

0 commit comments

Comments
 (0)