Skip to content

Commit 81badc3

Browse files
authored
Fix FlowableOnBackpressureBufferStrategy (#4441)
* Fix buffered objects not propagated downstream in FlowableOnBackpressureBufferStrategy Fix drop strategy logic in FlowableOnBackpressureBufferStrategy Add unit test for FlowableOnBackpressureBufferStrategy, copied from FlowableOnBackpressureBufferTest, there is still some work needed to have a better coverage * Fix FlowableOnBackpressureBufferStrategy Move the call to drain out of the synchronized block Cleaned up unit tests to follow recommendations from akarnokd
2 parents a856572 + 1617087 commit 81badc3

File tree

2 files changed

+137
-4
lines changed

2 files changed

+137
-4
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -103,27 +103,28 @@ public void onNext(T t) {
103103
return;
104104
}
105105
boolean callOnOverflow = false;
106+
boolean callError = false;
106107
Deque<T> dq = deque;
107108
synchronized (dq) {
108109
if (dq.size() == bufferSize) {
109110
switch (strategy) {
110111
case DROP_LATEST:
111-
dq.poll();
112+
dq.pollLast();
112113
dq.offer(t);
113114
callOnOverflow = true;
114115
break;
115116
case DROP_OLDEST:
116-
dq.pollLast();
117+
dq.poll();
117118
dq.offer(t);
118119
callOnOverflow = true;
119120
break;
120121
default:
121122
// signal error
123+
callError = true;
122124
break;
123125
}
124126
} else {
125127
dq.offer(t);
126-
return;
127128
}
128129
}
129130

@@ -137,9 +138,11 @@ public void onNext(T t) {
137138
onError(ex);
138139
}
139140
}
140-
} else {
141+
} else if(callError) {
141142
s.cancel();
142143
onError(new MissingBackpressureException());
144+
} else {
145+
drain();
143146
}
144147
}
145148

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import io.reactivex.Flowable;
17+
import io.reactivex.functions.Action;
18+
import io.reactivex.internal.subscriptions.BooleanSubscription;
19+
import io.reactivex.subscribers.DefaultSubscriber;
20+
import io.reactivex.subscribers.TestSubscriber;
21+
import org.junit.Test;
22+
import org.reactivestreams.Publisher;
23+
import org.reactivestreams.Subscriber;
24+
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
27+
import static io.reactivex.BackpressureOverflowStrategy.DROP_LATEST;
28+
import static io.reactivex.BackpressureOverflowStrategy.DROP_OLDEST;
29+
import static io.reactivex.internal.functions.Functions.EMPTY_ACTION;
30+
import static org.junit.Assert.assertEquals;
31+
32+
public class FlowableOnBackpressureBufferStrategyTest {
33+
34+
@Test(timeout = 2000)
35+
public void backpressureWithBufferDropOldest() throws InterruptedException {
36+
int bufferSize = 3;
37+
final AtomicInteger droppedCount = new AtomicInteger(0);
38+
Action incrementOnDrop = new Action() {
39+
@Override
40+
public void run() throws Exception {
41+
droppedCount.incrementAndGet();
42+
}
43+
};
44+
TestSubscriber<Long> ts = createTestSubscriber();
45+
Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_OLDEST))
46+
.subscribe(ts);
47+
// we request 10 but only 3 should come from the buffer
48+
ts.request(10);
49+
ts.awaitTerminalEvent();
50+
assertEquals(bufferSize, ts.values().size());
51+
ts.assertNoErrors();
52+
assertEquals(497, ts.values().get(0).intValue());
53+
assertEquals(498, ts.values().get(1).intValue());
54+
assertEquals(499, ts.values().get(2).intValue());
55+
assertEquals(droppedCount.get(), 500 - bufferSize);
56+
}
57+
58+
private TestSubscriber<Long> createTestSubscriber() {
59+
return new TestSubscriber<Long>(new DefaultSubscriber<Long>() {
60+
61+
@Override
62+
protected void onStart() {
63+
}
64+
65+
@Override
66+
public void onComplete() {
67+
}
68+
69+
@Override
70+
public void onError(Throwable e) {
71+
}
72+
73+
@Override
74+
public void onNext(Long t) {
75+
}
76+
77+
}, 0L);
78+
}
79+
80+
@Test(timeout = 2000)
81+
public void backpressureWithBufferDropLatest() throws InterruptedException {
82+
int bufferSize = 3;
83+
final AtomicInteger droppedCount = new AtomicInteger(0);
84+
Action incrementOnDrop = new Action() {
85+
@Override
86+
public void run() throws Exception {
87+
droppedCount.incrementAndGet();
88+
}
89+
};
90+
TestSubscriber<Long> ts = createTestSubscriber();
91+
Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_LATEST))
92+
.subscribe(ts);
93+
// we request 10 but only 3 should come from the buffer
94+
ts.request(10);
95+
ts.awaitTerminalEvent();
96+
assertEquals(bufferSize, ts.values().size());
97+
ts.assertNoErrors();
98+
assertEquals(0, ts.values().get(0).intValue());
99+
assertEquals(1, ts.values().get(1).intValue());
100+
assertEquals(499, ts.values().get(2).intValue());
101+
assertEquals(droppedCount.get(), 500 - bufferSize);
102+
}
103+
104+
private static final Flowable<Long> send500ValuesAndComplete = Flowable.unsafeCreate(new Publisher<Long>() {
105+
@Override
106+
public void subscribe(Subscriber<? super Long> s) {
107+
BooleanSubscription bs = new BooleanSubscription();
108+
s.onSubscribe(bs);
109+
long i = 0;
110+
while (!bs.isCancelled() && i < 500) {
111+
s.onNext(i++);
112+
}
113+
if(!bs.isCancelled()){
114+
s.onComplete();
115+
}
116+
}
117+
});
118+
119+
120+
@Test(expected = IllegalArgumentException.class)
121+
public void backpressureBufferNegativeCapacity() throws InterruptedException {
122+
Flowable.empty().onBackpressureBuffer(-1, EMPTY_ACTION , DROP_OLDEST);
123+
}
124+
125+
@Test(expected = IllegalArgumentException.class)
126+
public void backpressureBufferZeroCapacity() throws InterruptedException {
127+
Flowable.empty().onBackpressureBuffer(0, EMPTY_ACTION , DROP_OLDEST);
128+
}
129+
130+
}

0 commit comments

Comments
 (0)