Skip to content

Commit d73b968

Browse files
committed
OperatorObserveOn should not request more after child is unsubscribed
1 parent 425a6f4 commit d73b968

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ void pollQueue() {
185185
counter = 1;
186186
long produced = 0;
187187
long r = requested;
188-
while (!child.isUnsubscribed()) {
188+
boolean isUnsubscribed;
189+
while (!(isUnsubscribed = child.isUnsubscribed())) {
189190
Throwable error;
190191
if (finished) {
191192
if ((error = this.error) != null) {
@@ -214,6 +215,8 @@ void pollQueue() {
214215
break;
215216
}
216217
}
218+
if (isUnsubscribed)
219+
return;
217220
if (produced > 0 && requested != Long.MAX_VALUE) {
218221
REQUESTED.addAndGet(this, -produced);
219222
}

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import static org.mockito.Mockito.times;
2727
import static org.mockito.Mockito.verify;
2828

29+
import java.util.ArrayList;
2930
import java.util.Arrays;
31+
import java.util.Collections;
3032
import java.util.Iterator;
3133
import java.util.List;
3234
import java.util.concurrent.CountDownLatch;
@@ -765,4 +767,42 @@ public void onNext(Integer t) {
765767

766768
}
767769

770+
@Test
771+
public void testNoMoreRequestsAfterUnsubscribe() throws InterruptedException {
772+
final CountDownLatch latch = new CountDownLatch(1);
773+
final List<Long> requests = Collections.synchronizedList(new ArrayList<Long>());
774+
Observable.range(1, 1000000)
775+
.doOnRequest(new Action1<Long>() {
776+
777+
@Override
778+
public void call(Long n) {
779+
requests.add(n);
780+
}
781+
})
782+
.observeOn(Schedulers.io())
783+
.subscribe(new Subscriber<Integer>() {
784+
785+
@Override
786+
public void onStart() {
787+
request(1);
788+
}
789+
790+
@Override
791+
public void onCompleted() {
792+
}
793+
794+
@Override
795+
public void onError(Throwable e) {
796+
}
797+
798+
@Override
799+
public void onNext(Integer t) {
800+
unsubscribe();
801+
latch.countDown();
802+
}
803+
});
804+
assertTrue(latch.await(10, TimeUnit.SECONDS));
805+
assertEquals(1, requests.size());
806+
}
807+
768808
}

0 commit comments

Comments
 (0)