Skip to content

Commit 7071fc0

Browse files
committed
OperatorObserveOn should not request more after child is unsubscribed
1 parent 425a6f4 commit 7071fc0

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,9 @@ void pollQueue() {
185185
counter = 1;
186186
long produced = 0;
187187
long r = requested;
188-
while (!child.isUnsubscribed()) {
188+
for (;;) {
189+
if (child.isUnsubscribed())
190+
return;
189191
Throwable error;
190192
if (finished) {
191193
if ((error = this.error) != null) {

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import static org.mockito.Mockito.times;
2727
import static org.mockito.Mockito.verify;
2828

29+
import java.util.ArrayList;
2930
import java.util.Arrays;
31+
import java.util.Collections;
3032
import java.util.Iterator;
3133
import java.util.List;
3234
import java.util.concurrent.CountDownLatch;
@@ -765,4 +767,42 @@ public void onNext(Integer t) {
765767

766768
}
767769

770+
@Test
771+
public void testNoMoreRequestsAfterUnsubscribe() throws InterruptedException {
772+
final CountDownLatch latch = new CountDownLatch(1);
773+
final List<Long> requests = Collections.synchronizedList(new ArrayList<Long>());
774+
Observable.range(1, 1000000)
775+
.doOnRequest(new Action1<Long>() {
776+
777+
@Override
778+
public void call(Long n) {
779+
requests.add(n);
780+
}
781+
})
782+
.observeOn(Schedulers.io())
783+
.subscribe(new Subscriber<Integer>() {
784+
785+
@Override
786+
public void onStart() {
787+
request(1);
788+
}
789+
790+
@Override
791+
public void onCompleted() {
792+
}
793+
794+
@Override
795+
public void onError(Throwable e) {
796+
}
797+
798+
@Override
799+
public void onNext(Integer t) {
800+
unsubscribe();
801+
latch.countDown();
802+
}
803+
});
804+
assertTrue(latch.await(10, TimeUnit.SECONDS));
805+
assertEquals(1, requests.size());
806+
}
807+
768808
}

0 commit comments

Comments
 (0)