Skip to content

OperatorOnBackpressureDrop request overflow check #2770

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {

@Override
public void request(long n) {
requested.getAndAdd(n);
BackpressureUtils.getAndAddRequest(requested, n);
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import static org.junit.Assert.assertEquals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Test;

import rx.Observable;
Expand All @@ -27,8 +30,6 @@
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import java.util.concurrent.CountDownLatch;

public class OperatorOnBackpressureDropTest {

@Test
Expand Down Expand Up @@ -87,6 +88,35 @@ public void onNext(Long t) {
ts.assertNoErrors();
assertEquals(0, ts.getOnNextEvents().get(0).intValue());
}

@Test
public void testRequestOverflow() throws InterruptedException {
final AtomicInteger count = new AtomicInteger();
int n = 10;
range(n).onBackpressureDrop().subscribe(new Subscriber<Long>() {

@Override
public void onStart() {
request(10);
}

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
throw new RuntimeException(e);
}

@Override
public void onNext(Long t) {
count.incrementAndGet();
//cause overflow of requested if not handled properly in onBackpressureDrop operator
request(Long.MAX_VALUE-1);
}});
assertEquals(n, count.get());
}

static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {

Expand All @@ -99,4 +129,22 @@ public void call(Subscriber<? super Long> s) {
}

});

private static final Observable<Long> range(final long n) {
return Observable.create(new OnSubscribe<Long>() {

@Override
public void call(Subscriber<? super Long> s) {
for (long i=0;i < n;i++) {
if (s.isUnsubscribed()) {
break;
}
s.onNext(i);
}
s.onCompleted();
}

});
}

}