Skip to content

Commit e4598a5

Browse files
committed
OnBackpressureBuffer: DROP_LATEST and DROP_OLDEST
Introduce a new interface BackpressureOverflow.Strategy that allows implementing different handlers for an overflow situation. This patch adds three implementations: - ON_OVERFLOW_ERROR remains the default as the existing implementation. - ON_OVERFLOW_DROP_LATEST will drop newly produced items after the buffer fills up. - ON_OVERFLOW_DROP_OLDEST will drop the oldest elements in the buffer, making room for newer ones. The default strategy remains ON_OVERFLOW_ERROR. In all cases, a drop will result in a notification to the producer by invoking the onOverflow callback. None of the two new behaviours (ON_OVERFLOW_DROP_*) will unsubscribe from the source nor onError. Fixes: #3233
1 parent f7321d2 commit e4598a5

File tree

5 files changed

+308
-48
lines changed

5 files changed

+308
-48
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import rx.annotations.Experimental;
19+
import rx.exceptions.MissingBackpressureException;
20+
21+
/**
22+
* Generic strategy and default implementations to deal with backpressure buffer overflows.
23+
*/
24+
@Experimental
25+
public final class BackpressureOverflow {
26+
27+
public interface Strategy {
28+
29+
/**
30+
* Whether the Backpressure manager should attempt to drop the oldest item, or simply
31+
* drop the item currently causing backpressure.
32+
*
33+
* @return true to request drop of the oldest item, false to drop the newest.
34+
* @throws MissingBackpressureException
35+
*/
36+
boolean mayAttemptDrop() throws MissingBackpressureException;
37+
}
38+
39+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DEFAULT = Error.INSTANCE;
40+
@SuppressWarnings("unused")
41+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_ERROR = Error.INSTANCE;
42+
@SuppressWarnings("unused")
43+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_OLDEST = DropOldest.INSTANCE;
44+
@SuppressWarnings("unused")
45+
public static final BackpressureOverflow.Strategy ON_OVERFLOW_DROP_LATEST = DropLatest.INSTANCE;
46+
47+
/**
48+
* Drop oldest items from the buffer making room for newer ones.
49+
*/
50+
static class DropOldest implements BackpressureOverflow.Strategy {
51+
static final DropOldest INSTANCE = new DropOldest();
52+
53+
private DropOldest() {}
54+
55+
@Override
56+
public boolean mayAttemptDrop() {
57+
return true;
58+
}
59+
}
60+
61+
/**
62+
* Drop most recent items, but not {@code onError} nor unsubscribe from source
63+
* (as {code OperatorOnBackpressureDrop}).
64+
*/
65+
static class DropLatest implements BackpressureOverflow.Strategy {
66+
static final DropLatest INSTANCE = new DropLatest();
67+
68+
private DropLatest() {}
69+
70+
@Override
71+
public boolean mayAttemptDrop() {
72+
return false;
73+
}
74+
}
75+
76+
/**
77+
* {@code onError} a MissingBackpressureException and unsubscribe from source.
78+
*/
79+
static class Error implements BackpressureOverflow.Strategy {
80+
81+
static final Error INSTANCE = new Error();
82+
83+
private Error() {}
84+
85+
@Override
86+
public boolean mayAttemptDrop() throws MissingBackpressureException {
87+
throw new MissingBackpressureException("Overflowed buffer");
88+
}
89+
}
90+
}

src/main/java/rx/Observable.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6399,7 +6399,8 @@ public final Observable<T> onBackpressureBuffer() {
63996399
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
64006400
* </dl>
64016401
*
6402-
* @return the source Observable modified to buffer items up to the given capacity
6402+
* @param capacity number of slots available in the buffer.
6403+
* @return the source {@code Observable} modified to buffer items up to the given capacity.
64036404
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
64046405
* @since 1.1.0
64056406
*/
@@ -6419,14 +6420,51 @@ public final Observable<T> onBackpressureBuffer(long capacity) {
64196420
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
64206421
* </dl>
64216422
*
6422-
* @return the source Observable modified to buffer items up to the given capacity
6423+
* @param capacity number of slots available in the buffer.
6424+
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
6425+
* @return the source {@code Observable} modified to buffer items up to the given capacity
64236426
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
64246427
* @since 1.1.0
64256428
*/
64266429
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
64276430
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
64286431
}
64296432

6433+
/**
6434+
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer up to
6435+
* a given amount of items until they can be emitted. The resulting Observable will behave as determined
6436+
* by {@code overflowStrategy} if the buffer capacity is exceeded.
6437+
*
6438+
* <ul>
6439+
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_ERROR} (default) will {@code onError} dropping all undelivered items,
6440+
* unsubscribing from the source, and notifying the producer with {@code onOverflow}. </li>
6441+
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_LATEST} will drop any new items emitted by the producer while
6442+
* the buffer is full, without generating any {@code onError}. Each drop will however invoke {@code onOverflow}
6443+
* to signal the overflow to the producer.</li>j
6444+
* <li>{@code BackpressureOverflow.Strategy.ON_OVERFLOW_DROP_OLDEST} will drop the oldest items in the buffer in order to make
6445+
* room for newly emitted ones. Overflow will not generate an{@code onError}, but each drop will invoke
6446+
* {@code onOverflow} to signal the overflow to the producer.</li>
6447+
* </ul>
6448+
*
6449+
* <p>
6450+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
6451+
* <dl>
6452+
* <dt><b>Scheduler:</b></dt>
6453+
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
6454+
* </dl>
6455+
*
6456+
* @param capacity number of slots available in the buffer.
6457+
* @param onOverflow action to execute if an item needs to be buffered, but there are no available slots. Null is allowed.
6458+
* @param overflowStrategy how should the {@code Observable} react to buffer overflows. Null is not allowed.
6459+
* @return the source {@code Observable} modified to buffer items up to the given capacity
6460+
* @see <a href="http://reactivex.io/documentation/operators/backpressure.html">ReactiveX operators documentation: backpressure operators</a>
6461+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
6462+
*/
6463+
@Experimental
6464+
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow, BackpressureOverflow.Strategy overflowStrategy) {
6465+
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow, overflowStrategy));
6466+
}
6467+
64306468
/**
64316469
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
64326470
* rather than emit, those items that its observer is not prepared to observe.

src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020
import java.util.concurrent.atomic.AtomicLong;
2121

22+
import rx.BackpressureOverflow;
2223
import rx.Observable.Operator;
2324
import rx.Producer;
2425
import rx.Subscriber;
@@ -27,15 +28,18 @@
2728
import rx.functions.Action0;
2829
import rx.internal.util.BackpressureDrainManager;
2930

31+
import static rx.BackpressureOverflow.*;
32+
3033
public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {
3134

3235
private final Long capacity;
3336
private final Action0 onOverflow;
37+
private final BackpressureOverflow.Strategy overflowStrategy;
3438

3539
private static class Holder {
3640
static final OperatorOnBackpressureBuffer<?> INSTANCE = new OperatorOnBackpressureBuffer<Object>();
3741
}
38-
42+
3943
@SuppressWarnings("unchecked")
4044
public static <T> OperatorOnBackpressureBuffer<T> instance() {
4145
return (OperatorOnBackpressureBuffer<T>) Holder.INSTANCE;
@@ -44,33 +48,65 @@ public static <T> OperatorOnBackpressureBuffer<T> instance() {
4448
OperatorOnBackpressureBuffer() {
4549
this.capacity = null;
4650
this.onOverflow = null;
51+
this.overflowStrategy = ON_OVERFLOW_DEFAULT;
4752
}
4853

54+
/**
55+
* Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
56+
* following behavior config:
57+
*
58+
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
59+
*/
4960
public OperatorOnBackpressureBuffer(long capacity) {
50-
this(capacity, null);
61+
this(capacity, null, ON_OVERFLOW_DEFAULT);
5162
}
5263

64+
/**
65+
* Construct a new instance that will handle overflows with {@code ON_OVERFLOW_DEFAULT}, providing the
66+
* following behavior config:
67+
*
68+
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
69+
* @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
70+
*/
5371
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
72+
this(capacity, onOverflow, ON_OVERFLOW_DEFAULT);
73+
}
74+
75+
/**
76+
* Construct a new instance feeding the following behavior config:
77+
*
78+
* @param capacity the max number of items to be admitted in the buffer, must be greater than 0.
79+
* @param onOverflow the {@code Action0} to execute when the buffer overflows, may be null.
80+
* @param overflowStrategy the {@code BackpressureOverflow.Strategy} to handle overflows, it must not be null.
81+
*/
82+
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow,
83+
BackpressureOverflow.Strategy overflowStrategy) {
5484
if (capacity <= 0) {
5585
throw new IllegalArgumentException("Buffer capacity must be > 0");
5686
}
87+
if (overflowStrategy == null) {
88+
throw new NullPointerException("The BackpressureOverflow strategy must not be null");
89+
}
5790
this.capacity = capacity;
5891
this.onOverflow = onOverflow;
92+
this.overflowStrategy = overflowStrategy;
5993
}
6094

6195
@Override
6296
public Subscriber<? super T> call(final Subscriber<? super T> child) {
6397

6498
// don't pass through subscriber as we are async and doing queue draining
6599
// a parent being unsubscribed should not affect the children
66-
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow);
100+
BufferSubscriber<T> parent = new BufferSubscriber<T>(child, capacity, onOverflow,
101+
overflowStrategy);
67102

68103
// if child unsubscribes it should unsubscribe the parent, but not the other way around
69104
child.add(parent);
70105
child.setProducer(parent.manager());
71106

72107
return parent;
73108
}
109+
74110
private static final class BufferSubscriber<T> extends Subscriber<T> implements BackpressureDrainManager.BackpressureQueueCallback {
75111
// TODO get a different queue implementation
76112
private final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
@@ -81,14 +117,18 @@ private static final class BufferSubscriber<T> extends Subscriber<T> implements
81117
private final BackpressureDrainManager manager;
82118
private final NotificationLite<T> on = NotificationLite.instance();
83119
private final Action0 onOverflow;
120+
private final BackpressureOverflow.Strategy overflowStrategy;
84121

85-
public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow) {
122+
public BufferSubscriber(final Subscriber<? super T> child, Long capacity, Action0 onOverflow,
123+
BackpressureOverflow.Strategy overflowStrategy) {
86124
this.child = child;
87125
this.baseCapacity = capacity;
88126
this.capacity = capacity != null ? new AtomicLong(capacity) : null;
89127
this.onOverflow = onOverflow;
90128
this.manager = new BackpressureDrainManager(this);
129+
this.overflowStrategy = overflowStrategy;
91130
}
131+
92132
@Override
93133
public void onStart() {
94134
request(Long.MAX_VALUE);
@@ -141,7 +181,7 @@ public Object poll() {
141181
}
142182
return value;
143183
}
144-
184+
145185
private boolean assertCapacity() {
146186
if (capacity == null) {
147187
return true;
@@ -151,24 +191,30 @@ private boolean assertCapacity() {
151191
do {
152192
currCapacity = capacity.get();
153193
if (currCapacity <= 0) {
154-
if (saturated.compareAndSet(false, true)) {
155-
unsubscribe();
156-
child.onError(new MissingBackpressureException(
157-
"Overflowed buffer of "
158-
+ baseCapacity));
159-
if (onOverflow != null) {
160-
try {
161-
onOverflow.call();
162-
} catch (Throwable e) {
163-
Exceptions.throwIfFatal(e);
164-
manager.terminateAndDrain(e);
165-
// this line not strictly necessary but nice for clarity
166-
// and in case of future changes to code after this catch block
167-
return false;
168-
}
194+
boolean hasCapacity = false;
195+
try {
196+
// ok if we're allowed to drop, and there is indeed an item to discard
197+
hasCapacity = overflowStrategy.mayAttemptDrop() && poll() != null;
198+
} catch (MissingBackpressureException e) {
199+
if (saturated.compareAndSet(false, true)) {
200+
unsubscribe();
201+
child.onError(e);
169202
}
170203
}
171-
return false;
204+
if (onOverflow != null) {
205+
try {
206+
onOverflow.call();
207+
} catch (Throwable e) {
208+
Exceptions.throwIfFatal(e);
209+
manager.terminateAndDrain(e);
210+
// this line not strictly necessary but nice for clarity
211+
// and in case of future changes to code after this catch block
212+
return false;
213+
}
214+
}
215+
if (!hasCapacity) {
216+
return false;
217+
}
172218
}
173219
// ensure no other thread stole our slot, or retry
174220
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));

0 commit comments

Comments
 (0)