Skip to content

Commit 73715bf

Browse files
Merge pull request #1957 from zsxwing/fix-scan
Fix 'request(0)' issue in Scan
2 parents ba85468 + 29c1b6e commit 73715bf

File tree

2 files changed

+59
-13
lines changed

2 files changed

+59
-13
lines changed

src/main/java/rx/internal/operators/OperatorScan.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -147,19 +147,17 @@ public void request(long n) {
147147
if (once.compareAndSet(false, true)) {
148148
if (initialValue == NO_INITIAL_VALUE || n == Long.MAX_VALUE) {
149149
producer.request(n);
150+
} else if (n == 1) {
151+
excessive.set(true);
152+
producer.request(1); // request at least 1
150153
} else {
151-
if (n == Long.MAX_VALUE) {
152-
producer.request(Long.MAX_VALUE);
153-
} else if (n == 1) {
154-
excessive.set(true);
155-
producer.request(1); // request at least 1
156-
} else {
157-
producer.request(n - 1);
158-
}
154+
// n != Long.MAX_VALUE && n != 1
155+
producer.request(n - 1);
159156
}
160157
} else {
161158
// pass-thru after first time
162-
if (excessive.compareAndSet(true, false) && n != Long.MAX_VALUE) {
159+
if (n > 1 // avoid to request 0
160+
&& excessive.compareAndSet(true, false) && n != Long.MAX_VALUE) {
163161
producer.request(n - 1);
164162
} else {
165163
producer.request(n);

src/test/java/rx/internal/operators/OperatorScanTest.java

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,22 @@
2020
import static org.mockito.Matchers.any;
2121
import static org.mockito.Matchers.anyInt;
2222
import static org.mockito.Matchers.anyString;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.never;
25-
import static org.mockito.Mockito.times;
26-
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.*;
2724

2825
import java.util.ArrayList;
2926
import java.util.Arrays;
3027
import java.util.List;
28+
import java.util.concurrent.atomic.AtomicBoolean;
3129
import java.util.concurrent.atomic.AtomicInteger;
30+
import java.util.concurrent.atomic.AtomicReference;
3231

3332
import org.junit.Before;
3433
import org.junit.Test;
3534
import org.mockito.MockitoAnnotations;
3635

3736
import rx.Observable;
3837
import rx.Observer;
38+
import rx.Producer;
3939
import rx.Subscriber;
4040
import rx.functions.Action2;
4141
import rx.functions.Func0;
@@ -312,4 +312,52 @@ public Integer call(Integer t1, Integer t2) {
312312
subscriber.assertTerminalEvent();
313313
subscriber.assertNoErrors();
314314
}
315+
316+
@Test
317+
public void testScanShouldNotRequestZero() {
318+
final AtomicReference<Producer> producer = new AtomicReference<Producer>();
319+
Observable<Integer> o = Observable.create(new Observable.OnSubscribe<Integer>() {
320+
@Override
321+
public void call(final Subscriber subscriber) {
322+
Producer p = spy(new Producer() {
323+
324+
private AtomicBoolean requested = new AtomicBoolean(false);
325+
326+
@Override
327+
public void request(long n) {
328+
if (requested.compareAndSet(false, true)) {
329+
subscriber.onNext(1);
330+
} else {
331+
subscriber.onCompleted();
332+
}
333+
}
334+
});
335+
producer.set(p);
336+
subscriber.setProducer(p);
337+
}
338+
}).scan(100, new Func2<Integer, Integer, Integer>() {
339+
340+
@Override
341+
public Integer call(Integer t1, Integer t2) {
342+
return t1 + t2;
343+
}
344+
345+
});
346+
347+
o.subscribe(new TestSubscriber<Integer>() {
348+
349+
@Override
350+
public void onStart() {
351+
request(1);
352+
}
353+
354+
@Override
355+
public void onNext(Integer integer) {
356+
request(1);
357+
}
358+
});
359+
360+
verify(producer.get(), never()).request(0);
361+
verify(producer.get(), times(2)).request(1);
362+
}
315363
}

0 commit comments

Comments
 (0)