Skip to content

Commit 6077529

Browse files
committed
Fix buffered objects not propagated downstream in FlowableOnBackpressureBufferStrategy
Fix drop strategy logic in FlowableOnBackpressureBufferStrategy Add unit test for FlowableOnBackpressureBufferStrategy, copied from FlowableOnBackpressureBufferTest, there is still some work needed to have a better coverage
1 parent f97c50d commit 6077529

File tree

2 files changed

+136
-2
lines changed

2 files changed

+136
-2
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,12 @@ public void onNext(T t) {
108108
if (dq.size() == bufferSize) {
109109
switch (strategy) {
110110
case DROP_LATEST:
111-
dq.poll();
111+
dq.pollLast();
112112
dq.offer(t);
113113
callOnOverflow = true;
114114
break;
115115
case DROP_OLDEST:
116-
dq.pollLast();
116+
dq.poll();
117117
dq.offer(t);
118118
callOnOverflow = true;
119119
break;
@@ -123,6 +123,7 @@ public void onNext(T t) {
123123
}
124124
} else {
125125
dq.offer(t);
126+
drain();
126127
return;
127128
}
128129
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import static io.reactivex.BackpressureOverflowStrategy.DROP_OLDEST;
17+
import static org.junit.Assert.assertEquals;
18+
19+
import java.util.concurrent.CountDownLatch;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import org.junit.Test;
23+
24+
import io.reactivex.Flowable;
25+
import io.reactivex.functions.Action;
26+
import io.reactivex.internal.subscriptions.BooleanSubscription;
27+
import io.reactivex.schedulers.Schedulers;
28+
import io.reactivex.subscribers.DefaultSubscriber;
29+
import io.reactivex.subscribers.TestSubscriber;
30+
import org.reactivestreams.Publisher;
31+
import org.reactivestreams.Subscriber;
32+
33+
public class FlowableOnBackpressureBufferStrategyTest {
34+
35+
private static Action onOverFlow = new Action() {
36+
@Override
37+
public void run() throws Exception {
38+
// Nothing
39+
}
40+
};
41+
42+
43+
@Test(timeout = 2000)
44+
public void testFixBackpressureWithBuffer() throws InterruptedException {
45+
final CountDownLatch l1 = new CountDownLatch(100);
46+
final CountDownLatch l2 = new CountDownLatch(150);
47+
final AtomicInteger droppedCount = new AtomicInteger(0);
48+
Action incrementOnDrop = new Action() {
49+
@Override
50+
public void run() throws Exception {
51+
droppedCount.incrementAndGet();
52+
}
53+
};
54+
TestSubscriber<Long> ts = new TestSubscriber<Long>(new DefaultSubscriber<Long>() {
55+
56+
@Override
57+
protected void onStart() {
58+
}
59+
60+
@Override
61+
public void onComplete() {
62+
}
63+
64+
@Override
65+
public void onError(Throwable e) {
66+
}
67+
68+
@Override
69+
public void onNext(Long t) {
70+
l1.countDown();
71+
l2.countDown();
72+
}
73+
74+
}, 0L);
75+
// this will be ignored
76+
ts.request(100);
77+
// we take 500 so it unsubscribes
78+
Flowable.fromPublisher(infinite.subscribeOn(Schedulers.computation())
79+
.onBackpressureBuffer(1, incrementOnDrop , DROP_OLDEST))
80+
.take(500)
81+
.subscribe(ts);
82+
83+
// it completely ignores the `request(100)` and we get 500
84+
l1.await();
85+
assertEquals(100, ts.values().size());
86+
ts.request(50);
87+
l2.await();
88+
assertEquals(150, ts.values().size());
89+
ts.request(350);
90+
ts.awaitTerminalEvent();
91+
assertEquals(500, ts.values().size());
92+
ts.assertNoErrors();
93+
assertEquals(0, ts.values().get(0).intValue());
94+
assertEquals(499 + droppedCount.get(), ts.values().get(499).intValue());
95+
}
96+
97+
@Test(expected = IllegalArgumentException.class)
98+
public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException {
99+
Flowable.empty().onBackpressureBuffer(-1, onOverFlow , DROP_OLDEST);
100+
}
101+
102+
@Test(expected = IllegalArgumentException.class)
103+
public void testFixBackpressureBufferZeroCapacity() throws InterruptedException {
104+
Flowable.empty().onBackpressureBuffer(0, onOverFlow , DROP_OLDEST);
105+
}
106+
107+
108+
static final Flowable<Long> infinite = Flowable.unsafeCreate(new Publisher<Long>() {
109+
110+
@Override
111+
public void subscribe(Subscriber<? super Long> s) {
112+
BooleanSubscription bs = new BooleanSubscription();
113+
s.onSubscribe(bs);
114+
long i = 0;
115+
while (!bs.isCancelled()) {
116+
s.onNext(i++);
117+
}
118+
}
119+
120+
});
121+
122+
123+
@Test(expected = IllegalArgumentException.class)
124+
public void fixBackpressureBufferNegativeCapacity() throws InterruptedException {
125+
Flowable.empty().onBackpressureBuffer(-1, onOverFlow , DROP_OLDEST);
126+
}
127+
128+
@Test(expected = IllegalArgumentException.class)
129+
public void fixBackpressureBufferZeroCapacity() throws InterruptedException {
130+
Flowable.empty().onBackpressureBuffer(0, onOverFlow , DROP_OLDEST);
131+
}
132+
133+
}

0 commit comments

Comments
 (0)