Skip to content

Add optional bound to onBackpressureBuffer #1899

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5034,6 +5034,44 @@ public final Observable<T> onBackpressureBuffer() {
return lift(new OperatorOnBackpressureBuffer<T>());
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
* undelivered items, and unsubscribing from the source.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity.
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
public final Observable<T> onBackpressureBuffer(long capacity) {
return lift(new OperatorOnBackpressureBuffer<T>(capacity));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
* undelivered items, unsubscribing from the source, and notifying the producer with {@code onOverflow}.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @return the source Observable modified to buffer items up to the given capacity.
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
*/
public final Observable<T> onBackpressureBuffer(long capacity, Func0<Void> onOverflow) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest using an Action0 here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I'll add this as soon as I get some time to fix the broken tests.

return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
}

/**
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
* rather than emit, those items that its observer is not prepared to observe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,46 @@
*/
package rx.internal.operators;

import java.nio.BufferOverflowException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import rx.Observable.Operator;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Func0;

public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {

private final NotificationLite<T> on = NotificationLite.instance();

private final Long capacity;
private final Func0 onOverflow;

public OperatorOnBackpressureBuffer() {
this.capacity = null;
this.onOverflow = null;
}

public OperatorOnBackpressureBuffer(long capacity) {
this(capacity, null);
}

public OperatorOnBackpressureBuffer(long capacity, Func0<Void> onOverflow) {
if (capacity <= 0) {
throw new IllegalArgumentException("Buffer capacity must be > 0");
}
this.capacity = capacity;
this.onOverflow = onOverflow;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
// TODO get a different queue implementation
// TODO start with size hint
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity);
final AtomicLong wip = new AtomicLong();
final AtomicLong requested = new AtomicLong();

Expand All @@ -40,14 +63,17 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
@Override
public void request(long n) {
if (requested.getAndAdd(n) == 0) {
pollQueue(wip, requested, queue, child);
pollQueue(wip, requested, capacity, queue, child);
}
}

});
// don't pass through subscriber as we are async and doing queue draining
// a parent being unsubscribed should not affect the children
Subscriber<T> parent = new Subscriber<T>() {

private AtomicBoolean saturated = new AtomicBoolean(false);

@Override
public void onStart() {
request(Long.MAX_VALUE);
Expand All @@ -56,21 +82,47 @@ public void onStart() {
@Override
public void onCompleted() {
queue.offer(on.completed());
pollQueue(wip, requested, queue, child);
pollQueue(wip, requested, capacity, queue, child);
}

@Override
public void onError(Throwable e) {
queue.offer(on.error(e));
pollQueue(wip, requested, queue, child);
pollQueue(wip, requested, capacity, queue, child);
}

@Override
public void onNext(T t) {
if (!ensureCapacity()) {
return;
}
queue.offer(on.next(t));
pollQueue(wip, requested, queue, child);
pollQueue(wip, requested, capacity, queue, child);
}

private boolean ensureCapacity() {
if (capacity == null) {
return true;
}

long currCapacity;
do {
currCapacity = capacity.get();
if (currCapacity <= 0) {
if (saturated.compareAndSet(false, true)) {
// ensure single completion contract
child.onError(new BufferOverflowException());
unsubscribe();
if (onOverflow != null) {
onOverflow.call();
}
}
return false;
}
// ensure no other thread stole our slot, or retry
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
return true;
}
};

// if child unsubscribes it should unsubscribe the parent, but not the other way around
Expand All @@ -79,7 +131,7 @@ public void onNext(T t) {
return parent;
}

private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue, Subscriber<? super T> child) {
private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
if (requested.get() > 0) {
// only one draining at a time
Expand All @@ -96,6 +148,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue
requested.incrementAndGet();
return;
}
if (capacity != null) { // it's bounded
capacity.incrementAndGet();
}
on.accept(child, o);
} else {
// we hit the end ... so increment back to 0 again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package rx.internal.operators;

import static org.junit.Assert.assertEquals;

import java.nio.BufferOverflowException;
import java.util.concurrent.CountDownLatch;

import org.junit.Test;
Expand All @@ -25,9 +24,15 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func0;
import rx.observables.ConnectableObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class OperatorOnBackpressureBufferTest {

@Test
Expand Down Expand Up @@ -81,6 +86,68 @@ public void onNext(Long t) {
assertEquals(499, ts.getOnNextEvents().get(499).intValue());
}

@Test(expected = IllegalArgumentException.class)
public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException {
Observable.empty().onBackpressureBuffer(-1);
}

@Test(expected = IllegalArgumentException.class)
public void testFixBackpressureBufferZeroCapacity() throws InterruptedException {
Observable.empty().onBackpressureBuffer(-1);
}

@Test(timeout = 500)
public void testFixBackpressureBoundedBuffer() throws InterruptedException {
final CountDownLatch l1 = new CountDownLatch(250);
final CountDownLatch l2 = new CountDownLatch(500);
final CountDownLatch l3 = new CountDownLatch(1);
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {

@Override
public void onCompleted() {
}

@Override
public void onError(Throwable e) {
}

@Override
public void onNext(Long t) {
l1.countDown();
l2.countDown();
}

});

ts.requestMore(500);

final ConnectableObservable<Long> flood =
infinite.subscribeOn(Schedulers.computation())
.publish();
final ConnectableObservable<Long> batch =
infinite.subscribeOn(Schedulers.computation())
.onBackpressureBuffer(100, new Func0<Void>() {
@Override
public Void call() {
l3.countDown();
return null;
}
}).publish();
Subscription s = batch.subscribe(ts);
batch.connect(); // first controlled batch

l1.await();
flood.connect(); // open flood
l2.await(); // ts can only swallow 250 more
l3.await(); // hold until it chokes

assertEquals(500, ts.getOnNextEvents().size());
assertEquals(0, ts.getOnNextEvents().get(0).intValue());
assertTrue(ts.getOnErrorEvents().get(0) instanceof BufferOverflowException);
assertTrue(s.isUnsubscribed());

}

static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {

@Override
Expand All @@ -92,4 +159,5 @@ public void call(Subscriber<? super Long> s) {
}

});

}