Skip to content

Commit 1617087

Browse files
committed
Fix FlowableOnBackpressureBufferStrategy
Move the call to drain out of the synchronized block Cleaned up unit tests to follow recommendations from akarnokd
1 parent 6077529 commit 1617087

File tree

2 files changed

+64
-65
lines changed

2 files changed

+64
-65
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public void onNext(T t) {
103103
return;
104104
}
105105
boolean callOnOverflow = false;
106+
boolean callError = false;
106107
Deque<T> dq = deque;
107108
synchronized (dq) {
108109
if (dq.size() == bufferSize) {
@@ -119,12 +120,11 @@ public void onNext(T t) {
119120
break;
120121
default:
121122
// signal error
123+
callError = true;
122124
break;
123125
}
124126
} else {
125127
dq.offer(t);
126-
drain();
127-
return;
128128
}
129129
}
130130

@@ -138,9 +138,11 @@ public void onNext(T t) {
138138
onError(ex);
139139
}
140140
}
141-
} else {
141+
} else if(callError) {
142142
s.cancel();
143143
onError(new MissingBackpressureException());
144+
} else {
145+
drain();
144146
}
145147
}
146148

Lines changed: 59 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,67 @@
11
/**
22
* Copyright 2016 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
55
* compliance with the License. You may obtain a copy of the License at
6-
*
6+
*
77
* http://www.apache.org/licenses/LICENSE-2.0
8-
*
8+
*
99
* Unless required by applicable law or agreed to in writing, software distributed under the License is
1010
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import static io.reactivex.BackpressureOverflowStrategy.DROP_OLDEST;
17-
import static org.junit.Assert.assertEquals;
18-
19-
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.atomic.AtomicInteger;
21-
22-
import org.junit.Test;
23-
2416
import io.reactivex.Flowable;
2517
import io.reactivex.functions.Action;
2618
import io.reactivex.internal.subscriptions.BooleanSubscription;
27-
import io.reactivex.schedulers.Schedulers;
2819
import io.reactivex.subscribers.DefaultSubscriber;
2920
import io.reactivex.subscribers.TestSubscriber;
21+
import org.junit.Test;
3022
import org.reactivestreams.Publisher;
3123
import org.reactivestreams.Subscriber;
3224

33-
public class FlowableOnBackpressureBufferStrategyTest {
25+
import java.util.concurrent.atomic.AtomicInteger;
3426

35-
private static Action onOverFlow = new Action() {
36-
@Override
37-
public void run() throws Exception {
38-
// Nothing
39-
}
40-
};
27+
import static io.reactivex.BackpressureOverflowStrategy.DROP_LATEST;
28+
import static io.reactivex.BackpressureOverflowStrategy.DROP_OLDEST;
29+
import static io.reactivex.internal.functions.Functions.EMPTY_ACTION;
30+
import static org.junit.Assert.assertEquals;
4131

32+
public class FlowableOnBackpressureBufferStrategyTest {
4233

4334
@Test(timeout = 2000)
44-
public void testFixBackpressureWithBuffer() throws InterruptedException {
45-
final CountDownLatch l1 = new CountDownLatch(100);
46-
final CountDownLatch l2 = new CountDownLatch(150);
35+
public void backpressureWithBufferDropOldest() throws InterruptedException {
36+
int bufferSize = 3;
4737
final AtomicInteger droppedCount = new AtomicInteger(0);
4838
Action incrementOnDrop = new Action() {
4939
@Override
5040
public void run() throws Exception {
5141
droppedCount.incrementAndGet();
5242
}
5343
};
54-
TestSubscriber<Long> ts = new TestSubscriber<Long>(new DefaultSubscriber<Long>() {
44+
TestSubscriber<Long> ts = createTestSubscriber();
45+
Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_OLDEST))
46+
.subscribe(ts);
47+
// we request 10 but only 3 should come from the buffer
48+
ts.request(10);
49+
ts.awaitTerminalEvent();
50+
assertEquals(bufferSize, ts.values().size());
51+
ts.assertNoErrors();
52+
assertEquals(497, ts.values().get(0).intValue());
53+
assertEquals(498, ts.values().get(1).intValue());
54+
assertEquals(499, ts.values().get(2).intValue());
55+
assertEquals(droppedCount.get(), 500 - bufferSize);
56+
}
57+
58+
private TestSubscriber<Long> createTestSubscriber() {
59+
return new TestSubscriber<Long>(new DefaultSubscriber<Long>() {
5560

5661
@Override
5762
protected void onStart() {
5863
}
59-
64+
6065
@Override
6166
public void onComplete() {
6267
}
@@ -67,67 +72,59 @@ public void onError(Throwable e) {
6772

6873
@Override
6974
public void onNext(Long t) {
70-
l1.countDown();
71-
l2.countDown();
7275
}
7376

7477
}, 0L);
75-
// this will be ignored
76-
ts.request(100);
77-
// we take 500 so it unsubscribes
78-
Flowable.fromPublisher(infinite.subscribeOn(Schedulers.computation())
79-
.onBackpressureBuffer(1, incrementOnDrop , DROP_OLDEST))
80-
.take(500)
81-
.subscribe(ts);
82-
83-
// it completely ignores the `request(100)` and we get 500
84-
l1.await();
85-
assertEquals(100, ts.values().size());
86-
ts.request(50);
87-
l2.await();
88-
assertEquals(150, ts.values().size());
89-
ts.request(350);
78+
}
79+
80+
@Test(timeout = 2000)
81+
public void backpressureWithBufferDropLatest() throws InterruptedException {
82+
int bufferSize = 3;
83+
final AtomicInteger droppedCount = new AtomicInteger(0);
84+
Action incrementOnDrop = new Action() {
85+
@Override
86+
public void run() throws Exception {
87+
droppedCount.incrementAndGet();
88+
}
89+
};
90+
TestSubscriber<Long> ts = createTestSubscriber();
91+
Flowable.fromPublisher(send500ValuesAndComplete.onBackpressureBuffer(bufferSize, incrementOnDrop, DROP_LATEST))
92+
.subscribe(ts);
93+
// we request 10 but only 3 should come from the buffer
94+
ts.request(10);
9095
ts.awaitTerminalEvent();
91-
assertEquals(500, ts.values().size());
96+
assertEquals(bufferSize, ts.values().size());
9297
ts.assertNoErrors();
9398
assertEquals(0, ts.values().get(0).intValue());
94-
assertEquals(499 + droppedCount.get(), ts.values().get(499).intValue());
99+
assertEquals(1, ts.values().get(1).intValue());
100+
assertEquals(499, ts.values().get(2).intValue());
101+
assertEquals(droppedCount.get(), 500 - bufferSize);
95102
}
96103

97-
@Test(expected = IllegalArgumentException.class)
98-
public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException {
99-
Flowable.empty().onBackpressureBuffer(-1, onOverFlow , DROP_OLDEST);
100-
}
101-
102-
@Test(expected = IllegalArgumentException.class)
103-
public void testFixBackpressureBufferZeroCapacity() throws InterruptedException {
104-
Flowable.empty().onBackpressureBuffer(0, onOverFlow , DROP_OLDEST);
105-
}
106-
107-
108-
static final Flowable<Long> infinite = Flowable.unsafeCreate(new Publisher<Long>() {
109-
104+
private static final Flowable<Long> send500ValuesAndComplete = Flowable.unsafeCreate(new Publisher<Long>() {
110105
@Override
111106
public void subscribe(Subscriber<? super Long> s) {
112107
BooleanSubscription bs = new BooleanSubscription();
113108
s.onSubscribe(bs);
114109
long i = 0;
115-
while (!bs.isCancelled()) {
110+
while (!bs.isCancelled() && i < 500) {
116111
s.onNext(i++);
117112
}
113+
if(!bs.isCancelled()){
114+
s.onComplete();
115+
}
118116
}
119-
120117
});
121118

122119

123120
@Test(expected = IllegalArgumentException.class)
124-
public void fixBackpressureBufferNegativeCapacity() throws InterruptedException {
125-
Flowable.empty().onBackpressureBuffer(-1, onOverFlow , DROP_OLDEST);
121+
public void backpressureBufferNegativeCapacity() throws InterruptedException {
122+
Flowable.empty().onBackpressureBuffer(-1, EMPTY_ACTION , DROP_OLDEST);
126123
}
127124

128125
@Test(expected = IllegalArgumentException.class)
129-
public void fixBackpressureBufferZeroCapacity() throws InterruptedException {
130-
Flowable.empty().onBackpressureBuffer(0, onOverFlow , DROP_OLDEST);
126+
public void backpressureBufferZeroCapacity() throws InterruptedException {
127+
Flowable.empty().onBackpressureBuffer(0, EMPTY_ACTION , DROP_OLDEST);
131128
}
132129

133130
}

0 commit comments

Comments
 (0)