Skip to content

Commit a82800c

Browse files
committed
Merge pull request #2548 from davidmoten/request-check
Subscriber.request should throw exception if negative request made
2 parents 19a66b7 + 8fd09fb commit a82800c

File tree

2 files changed

+40
-0
lines changed

2 files changed

+40
-0
lines changed

src/main/java/rx/Subscriber.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,13 @@ public void onStart() {
9292
*
9393
* @param n the maximum number of items you want the Observable to emit to the Subscriber at this time, or
9494
* {@code Long.MAX_VALUE} if you want the Observable to emit items at its own pace
95+
* @throws IllegalArgumentException
96+
* if {@code n} is negative
9597
*/
9698
protected final void request(long n) {
99+
if (n < 0) {
100+
throw new IllegalArgumentException("number requested cannot be negative: " + n);
101+
}
97102
Producer shouldRequest = null;
98103
synchronized (this) {
99104
if (p != null) {

src/test/java/rx/SubscriberTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@
1616
package rx;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920

21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
2023
import java.util.concurrent.atomic.AtomicInteger;
2124
import java.util.concurrent.atomic.AtomicLong;
25+
import java.util.concurrent.atomic.AtomicReference;
2226

2327
import org.junit.Test;
2428

@@ -419,4 +423,35 @@ public void onNext(Integer t) {
419423

420424
assertEquals(1, c.get());
421425
}
426+
427+
@Test
428+
public void testNegativeRequestThrowsIllegalArgumentException() throws InterruptedException {
429+
final CountDownLatch latch = new CountDownLatch(1);
430+
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
431+
Observable.just(1,2,3,4).subscribe(new Subscriber<Integer>() {
432+
433+
@Override
434+
public void onStart() {
435+
request(1);
436+
}
437+
438+
@Override
439+
public void onCompleted() {
440+
441+
}
442+
443+
@Override
444+
public void onError(Throwable e) {
445+
exception.set(e);
446+
latch.countDown();
447+
}
448+
449+
@Override
450+
public void onNext(Integer t) {
451+
request(-1);
452+
request(1);
453+
}});
454+
assertTrue(latch.await(10, TimeUnit.SECONDS));
455+
assertTrue(exception.get() instanceof IllegalArgumentException);
456+
}
422457
}

0 commit comments

Comments
 (0)