Skip to content

Commit 01ceedc

Browse files
committed
TakeLastQueueProducer add request overflow check
1 parent 9fb5614 commit 01ceedc

File tree

2 files changed

+31
-1
lines changed

2 files changed

+31
-1
lines changed

src/main/java/rx/internal/operators/TakeLastQueueProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void request(long n) {
5555
if (n == Long.MAX_VALUE) {
5656
_c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE);
5757
} else {
58-
_c = REQUESTED_UPDATER.getAndAdd(this, n);
58+
_c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
5959
}
6060
if (!emittingStarted) {
6161
// we haven't started yet, so record what was requested and return

src/test/java/rx/internal/operators/OperatorTakeLastTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import static org.mockito.Mockito.times;
2424
import static org.mockito.Mockito.verify;
2525

26+
import java.util.ArrayList;
2627
import java.util.Arrays;
28+
import java.util.List;
2729
import java.util.concurrent.atomic.AtomicInteger;
2830

2931
import org.junit.Test;
@@ -293,4 +295,32 @@ public void onNext(Integer integer) {
293295
});
294296
assertEquals(1,count.get());
295297
}
298+
299+
@Test(timeout=10000)
300+
public void testRequestOverflow() {
301+
final List<Integer> list = new ArrayList<Integer>();
302+
Observable.range(1, 100).takeLast(50).subscribe(new Subscriber<Integer>() {
303+
304+
@Override
305+
public void onStart() {
306+
request(2);
307+
}
308+
309+
@Override
310+
public void onCompleted() {
311+
312+
}
313+
314+
@Override
315+
public void onError(Throwable e) {
316+
317+
}
318+
319+
@Override
320+
public void onNext(Integer t) {
321+
list.add(t);
322+
request(Long.MAX_VALUE-1);
323+
}});
324+
assertEquals(50, list.size());
325+
}
296326
}

0 commit comments

Comments
 (0)