Skip to content

Commit 2605ede

Browse files
committed
Merge pull request #2770 from davidmoten/onBackpressureDrop-request-overflow
OperatorOnBackpressureDrop request overflow check
2 parents 3a6ce5a + 9edfdac commit 2605ede

File tree

2 files changed

+51
-3
lines changed

2 files changed

+51
-3
lines changed

src/main/java/rx/internal/operators/OperatorOnBackpressureDrop.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4343

4444
@Override
4545
public void request(long n) {
46-
requested.getAndAdd(n);
46+
BackpressureUtils.getAndAddRequest(requested, n);
4747
}
4848

4949
});

src/test/java/rx/internal/operators/OperatorOnBackpressureDropTest.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919

20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.atomic.AtomicInteger;
22+
2023
import org.junit.Test;
2124

2225
import rx.Observable;
@@ -27,8 +30,6 @@
2730
import rx.observers.TestSubscriber;
2831
import rx.schedulers.Schedulers;
2932

30-
import java.util.concurrent.CountDownLatch;
31-
3233
public class OperatorOnBackpressureDropTest {
3334

3435
@Test
@@ -87,6 +88,35 @@ public void onNext(Long t) {
8788
ts.assertNoErrors();
8889
assertEquals(0, ts.getOnNextEvents().get(0).intValue());
8990
}
91+
92+
@Test
93+
public void testRequestOverflow() throws InterruptedException {
94+
final AtomicInteger count = new AtomicInteger();
95+
int n = 10;
96+
range(n).onBackpressureDrop().subscribe(new Subscriber<Long>() {
97+
98+
@Override
99+
public void onStart() {
100+
request(10);
101+
}
102+
103+
@Override
104+
public void onCompleted() {
105+
}
106+
107+
@Override
108+
public void onError(Throwable e) {
109+
throw new RuntimeException(e);
110+
}
111+
112+
@Override
113+
public void onNext(Long t) {
114+
count.incrementAndGet();
115+
//cause overflow of requested if not handled properly in onBackpressureDrop operator
116+
request(Long.MAX_VALUE-1);
117+
}});
118+
assertEquals(n, count.get());
119+
}
90120

91121
static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {
92122

@@ -99,4 +129,22 @@ public void call(Subscriber<? super Long> s) {
99129
}
100130

101131
});
132+
133+
private static final Observable<Long> range(final long n) {
134+
return Observable.create(new OnSubscribe<Long>() {
135+
136+
@Override
137+
public void call(Subscriber<? super Long> s) {
138+
for (long i=0;i < n;i++) {
139+
if (s.isUnsubscribed()) {
140+
break;
141+
}
142+
s.onNext(i);
143+
}
144+
s.onCompleted();
145+
}
146+
147+
});
148+
}
149+
102150
}

0 commit comments

Comments
 (0)