Skip to content

Commit c8c272e

Browse files
Merge pull request #1916 from benjchristensen/experimentalOnBackpressureBufferWithCapacity
Experimental onBackpressureBufferWithCapacity
2 parents c020c4a + 656648d commit c8c272e

File tree

3 files changed

+172
-6
lines changed

3 files changed

+172
-6
lines changed

src/main/java/rx/Observable.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.*;
1616
import java.util.concurrent.*;
1717

18+
import rx.annotations.Beta;
1819
import rx.annotations.Experimental;
1920
import rx.exceptions.*;
2021
import rx.functions.*;
@@ -5035,6 +5036,48 @@ public final Observable<T> onBackpressureBuffer() {
50355036
return lift(new OperatorOnBackpressureBuffer<T>());
50365037
}
50375038

5039+
/**
5040+
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
5041+
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
5042+
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
5043+
* undelivered items, and unsubscribing from the source.
5044+
* <p>
5045+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
5046+
* <dl>
5047+
* <dt><b>Scheduler:</b></dt>
5048+
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
5049+
* </dl>
5050+
*
5051+
* @return the source Observable modified to buffer items up to the given capacity.
5052+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5053+
* @Beta
5054+
*/
5055+
@Beta
5056+
public final Observable<T> onBackpressureBuffer(long capacity) {
5057+
return lift(new OperatorOnBackpressureBuffer<T>(capacity));
5058+
}
5059+
5060+
/**
5061+
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
5062+
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
5063+
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
5064+
* undelivered items, unsubscribing from the source, and notifying the producer with {@code onOverflow}.
5065+
* <p>
5066+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
5067+
* <dl>
5068+
* <dt><b>Scheduler:</b></dt>
5069+
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
5070+
* </dl>
5071+
*
5072+
* @return the source Observable modified to buffer items up to the given capacity.
5073+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5074+
* @Beta
5075+
*/
5076+
@Beta
5077+
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
5078+
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
5079+
}
5080+
50385081
/**
50395082
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
50405083
* rather than emit, those items that its observer is not prepared to observe.

src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,44 @@
1717

1818
import java.util.Queue;
1919
import java.util.concurrent.ConcurrentLinkedQueue;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021
import java.util.concurrent.atomic.AtomicLong;
2122

2223
import rx.Observable.Operator;
2324
import rx.Producer;
2425
import rx.Subscriber;
26+
import rx.exceptions.MissingBackpressureException;
27+
import rx.functions.Action0;
2528

2629
public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {
2730

2831
private final NotificationLite<T> on = NotificationLite.instance();
2932

33+
private final Long capacity;
34+
private final Action0 onOverflow;
35+
36+
public OperatorOnBackpressureBuffer() {
37+
this.capacity = null;
38+
this.onOverflow = null;
39+
}
40+
41+
public OperatorOnBackpressureBuffer(long capacity) {
42+
this(capacity, null);
43+
}
44+
45+
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
46+
if (capacity <= 0) {
47+
throw new IllegalArgumentException("Buffer capacity must be > 0");
48+
}
49+
this.capacity = capacity;
50+
this.onOverflow = onOverflow;
51+
}
52+
3053
@Override
3154
public Subscriber<? super T> call(final Subscriber<? super T> child) {
3255
// TODO get a different queue implementation
33-
// TODO start with size hint
3456
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
57+
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity);
3558
final AtomicLong wip = new AtomicLong();
3659
final AtomicLong requested = new AtomicLong();
3760

@@ -40,14 +63,17 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4063
@Override
4164
public void request(long n) {
4265
if (requested.getAndAdd(n) == 0) {
43-
pollQueue(wip, requested, queue, child);
66+
pollQueue(wip, requested, capacity, queue, child);
4467
}
4568
}
4669

4770
});
4871
// don't pass through subscriber as we are async and doing queue draining
4972
// a parent being unsubscribed should not affect the children
5073
Subscriber<T> parent = new Subscriber<T>() {
74+
75+
private AtomicBoolean saturated = new AtomicBoolean(false);
76+
5177
@Override
5278
public void onStart() {
5379
request(Long.MAX_VALUE);
@@ -56,21 +82,47 @@ public void onStart() {
5682
@Override
5783
public void onCompleted() {
5884
queue.offer(on.completed());
59-
pollQueue(wip, requested, queue, child);
85+
pollQueue(wip, requested, capacity, queue, child);
6086
}
6187

6288
@Override
6389
public void onError(Throwable e) {
6490
queue.offer(on.error(e));
65-
pollQueue(wip, requested, queue, child);
91+
pollQueue(wip, requested, capacity, queue, child);
6692
}
6793

6894
@Override
6995
public void onNext(T t) {
96+
if (!ensureCapacity()) {
97+
return;
98+
}
7099
queue.offer(on.next(t));
71-
pollQueue(wip, requested, queue, child);
100+
pollQueue(wip, requested, capacity, queue, child);
72101
}
73102

103+
private boolean ensureCapacity() {
104+
if (capacity == null) {
105+
return true;
106+
}
107+
108+
long currCapacity;
109+
do {
110+
currCapacity = capacity.get();
111+
if (currCapacity <= 0) {
112+
if (saturated.compareAndSet(false, true)) {
113+
// ensure single completion contract
114+
child.onError(new MissingBackpressureException("Overflowed buffer of " + OperatorOnBackpressureBuffer.this.capacity));
115+
unsubscribe();
116+
if (onOverflow != null) {
117+
onOverflow.call();
118+
}
119+
}
120+
return false;
121+
}
122+
// ensure no other thread stole our slot, or retry
123+
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
124+
return true;
125+
}
74126
};
75127

76128
// if child unsubscribes it should unsubscribe the parent, but not the other way around
@@ -79,7 +131,7 @@ public void onNext(T t) {
79131
return parent;
80132
}
81133

82-
private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue, Subscriber<? super T> child) {
134+
private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
83135
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
84136
if (requested.get() > 0) {
85137
// only one draining at a time
@@ -96,6 +148,9 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue
96148
requested.incrementAndGet();
97149
return;
98150
}
151+
if (capacity != null) { // it's bounded
152+
capacity.incrementAndGet();
153+
}
99154
on.accept(child, o);
100155
} else {
101156
// we hit the end ... so increment back to 0 again

src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920

21+
import java.nio.BufferOverflowException;
2022
import java.util.concurrent.CountDownLatch;
2123

2224
import org.junit.Test;
@@ -25,6 +27,10 @@
2527
import rx.Observable.OnSubscribe;
2628
import rx.Observer;
2729
import rx.Subscriber;
30+
import rx.Subscription;
31+
import rx.exceptions.MissingBackpressureException;
32+
import rx.functions.Action0;
33+
import rx.observables.ConnectableObservable;
2834
import rx.observers.TestSubscriber;
2935
import rx.schedulers.Schedulers;
3036

@@ -81,6 +87,67 @@ public void onNext(Long t) {
8187
assertEquals(499, ts.getOnNextEvents().get(499).intValue());
8288
}
8389

90+
@Test(expected = IllegalArgumentException.class)
91+
public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException {
92+
Observable.empty().onBackpressureBuffer(-1);
93+
}
94+
95+
@Test(expected = IllegalArgumentException.class)
96+
public void testFixBackpressureBufferZeroCapacity() throws InterruptedException {
97+
Observable.empty().onBackpressureBuffer(-1);
98+
}
99+
100+
@Test(timeout = 500)
101+
public void testFixBackpressureBoundedBuffer() throws InterruptedException {
102+
final CountDownLatch l1 = new CountDownLatch(250);
103+
final CountDownLatch l2 = new CountDownLatch(500);
104+
final CountDownLatch l3 = new CountDownLatch(1);
105+
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {
106+
107+
@Override
108+
public void onCompleted() {
109+
}
110+
111+
@Override
112+
public void onError(Throwable e) {
113+
}
114+
115+
@Override
116+
public void onNext(Long t) {
117+
l1.countDown();
118+
l2.countDown();
119+
}
120+
121+
});
122+
123+
ts.requestMore(500);
124+
125+
final ConnectableObservable<Long> flood =
126+
infinite.subscribeOn(Schedulers.computation())
127+
.publish();
128+
final ConnectableObservable<Long> batch =
129+
infinite.subscribeOn(Schedulers.computation())
130+
.onBackpressureBuffer(100, new Action0() {
131+
@Override
132+
public void call() {
133+
l3.countDown();
134+
}
135+
}).publish();
136+
Subscription s = batch.subscribe(ts);
137+
batch.connect(); // first controlled batch
138+
139+
l1.await();
140+
flood.connect(); // open flood
141+
l2.await(); // ts can only swallow 250 more
142+
l3.await(); // hold until it chokes
143+
144+
assertEquals(500, ts.getOnNextEvents().size());
145+
assertEquals(0, ts.getOnNextEvents().get(0).intValue());
146+
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
147+
assertTrue(s.isUnsubscribed());
148+
149+
}
150+
84151
static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {
85152

86153
@Override
@@ -92,4 +159,5 @@ public void call(Subscriber<? super Long> s) {
92159
}
93160

94161
});
162+
95163
}

0 commit comments

Comments
 (0)