Skip to content

Commit e2c6ced

Browse files
Merge pull request #1918 from ReactiveX/revert-1916-experimentalOnBackpressureBufferWithCapacity
Revert "Experimental onBackpressureBufferWithCapacity"
2 parents c8c272e + b9d5812 commit e2c6ced

File tree

3 files changed

+6
-172
lines changed

3 files changed

+6
-172
lines changed

src/main/java/rx/Observable.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import java.util.*;
1616
import java.util.concurrent.*;
1717

18-
import rx.annotations.Beta;
1918
import rx.annotations.Experimental;
2019
import rx.exceptions.*;
2120
import rx.functions.*;
@@ -5036,48 +5035,6 @@ public final Observable<T> onBackpressureBuffer() {
50365035
return lift(new OperatorOnBackpressureBuffer<T>());
50375036
}
50385037

5039-
/**
5040-
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
5041-
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
5042-
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
5043-
* undelivered items, and unsubscribing from the source.
5044-
* <p>
5045-
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
5046-
* <dl>
5047-
* <dt><b>Scheduler:</b></dt>
5048-
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
5049-
* </dl>
5050-
*
5051-
* @return the source Observable modified to buffer items up to the given capacity.
5052-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5053-
* @Beta
5054-
*/
5055-
@Beta
5056-
public final Observable<T> onBackpressureBuffer(long capacity) {
5057-
return lift(new OperatorOnBackpressureBuffer<T>(capacity));
5058-
}
5059-
5060-
/**
5061-
* Instructs an Observable that is emitting items faster than its observer can consume them to buffer
5062-
* up to a given amount of items until they can be emitted. The resulting Observable will {@code onError} emitting a
5063-
* {@link java.nio.BufferOverflowException} as soon as the buffer's capacity is exceeded, dropping all
5064-
* undelivered items, unsubscribing from the source, and notifying the producer with {@code onOverflow}.
5065-
* <p>
5066-
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/bp.obp.buffer.png" alt="">
5067-
* <dl>
5068-
* <dt><b>Scheduler:</b></dt>
5069-
* <dd>{@code onBackpressureBuffer} does not operate by default on a particular {@link Scheduler}.</dd>
5070-
* </dl>
5071-
*
5072-
* @return the source Observable modified to buffer items up to the given capacity.
5073-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5074-
* @Beta
5075-
*/
5076-
@Beta
5077-
public final Observable<T> onBackpressureBuffer(long capacity, Action0 onOverflow) {
5078-
return lift(new OperatorOnBackpressureBuffer<T>(capacity, onOverflow));
5079-
}
5080-
50815038
/**
50825039
* Instructs an Observable that is emitting items faster than its observer can consume them to discard,
50835040
* rather than emit, those items that its observer is not prepared to observe.

src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java

Lines changed: 6 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -17,44 +17,21 @@
1717

1818
import java.util.Queue;
1919
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.concurrent.atomic.AtomicBoolean;
2120
import java.util.concurrent.atomic.AtomicLong;
2221

2322
import rx.Observable.Operator;
2423
import rx.Producer;
2524
import rx.Subscriber;
26-
import rx.exceptions.MissingBackpressureException;
27-
import rx.functions.Action0;
2825

2926
public class OperatorOnBackpressureBuffer<T> implements Operator<T, T> {
3027

3128
private final NotificationLite<T> on = NotificationLite.instance();
3229

33-
private final Long capacity;
34-
private final Action0 onOverflow;
35-
36-
public OperatorOnBackpressureBuffer() {
37-
this.capacity = null;
38-
this.onOverflow = null;
39-
}
40-
41-
public OperatorOnBackpressureBuffer(long capacity) {
42-
this(capacity, null);
43-
}
44-
45-
public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) {
46-
if (capacity <= 0) {
47-
throw new IllegalArgumentException("Buffer capacity must be > 0");
48-
}
49-
this.capacity = capacity;
50-
this.onOverflow = onOverflow;
51-
}
52-
5330
@Override
5431
public Subscriber<? super T> call(final Subscriber<? super T> child) {
5532
// TODO get a different queue implementation
33+
// TODO start with size hint
5634
final ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>();
57-
final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity);
5835
final AtomicLong wip = new AtomicLong();
5936
final AtomicLong requested = new AtomicLong();
6037

@@ -63,17 +40,14 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
6340
@Override
6441
public void request(long n) {
6542
if (requested.getAndAdd(n) == 0) {
66-
pollQueue(wip, requested, capacity, queue, child);
43+
pollQueue(wip, requested, queue, child);
6744
}
6845
}
6946

7047
});
7148
// don't pass through subscriber as we are async and doing queue draining
7249
// a parent being unsubscribed should not affect the children
7350
Subscriber<T> parent = new Subscriber<T>() {
74-
75-
private AtomicBoolean saturated = new AtomicBoolean(false);
76-
7751
@Override
7852
public void onStart() {
7953
request(Long.MAX_VALUE);
@@ -82,47 +56,21 @@ public void onStart() {
8256
@Override
8357
public void onCompleted() {
8458
queue.offer(on.completed());
85-
pollQueue(wip, requested, capacity, queue, child);
59+
pollQueue(wip, requested, queue, child);
8660
}
8761

8862
@Override
8963
public void onError(Throwable e) {
9064
queue.offer(on.error(e));
91-
pollQueue(wip, requested, capacity, queue, child);
65+
pollQueue(wip, requested, queue, child);
9266
}
9367

9468
@Override
9569
public void onNext(T t) {
96-
if (!ensureCapacity()) {
97-
return;
98-
}
9970
queue.offer(on.next(t));
100-
pollQueue(wip, requested, capacity, queue, child);
71+
pollQueue(wip, requested, queue, child);
10172
}
10273

103-
private boolean ensureCapacity() {
104-
if (capacity == null) {
105-
return true;
106-
}
107-
108-
long currCapacity;
109-
do {
110-
currCapacity = capacity.get();
111-
if (currCapacity <= 0) {
112-
if (saturated.compareAndSet(false, true)) {
113-
// ensure single completion contract
114-
child.onError(new MissingBackpressureException("Overflowed buffer of " + OperatorOnBackpressureBuffer.this.capacity));
115-
unsubscribe();
116-
if (onOverflow != null) {
117-
onOverflow.call();
118-
}
119-
}
120-
return false;
121-
}
122-
// ensure no other thread stole our slot, or retry
123-
} while (!capacity.compareAndSet(currCapacity, currCapacity - 1));
124-
return true;
125-
}
12674
};
12775

12876
// if child unsubscribes it should unsubscribe the parent, but not the other way around
@@ -131,7 +79,7 @@ private boolean ensureCapacity() {
13179
return parent;
13280
}
13381

134-
private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue<Object> queue, Subscriber<? super T> child) {
82+
private void pollQueue(AtomicLong wip, AtomicLong requested, Queue<Object> queue, Subscriber<? super T> child) {
13583
// TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue?
13684
if (requested.get() > 0) {
13785
// only one draining at a time
@@ -148,9 +96,6 @@ private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity
14896
requested.incrementAndGet();
14997
return;
15098
}
151-
if (capacity != null) { // it's bounded
152-
capacity.incrementAndGet();
153-
}
15499
on.accept(child, o);
155100
} else {
156101
// we hit the end ... so increment back to 0 again

src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java

Lines changed: 0 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertTrue;
2019

21-
import java.nio.BufferOverflowException;
2220
import java.util.concurrent.CountDownLatch;
2321

2422
import org.junit.Test;
@@ -27,10 +25,6 @@
2725
import rx.Observable.OnSubscribe;
2826
import rx.Observer;
2927
import rx.Subscriber;
30-
import rx.Subscription;
31-
import rx.exceptions.MissingBackpressureException;
32-
import rx.functions.Action0;
33-
import rx.observables.ConnectableObservable;
3428
import rx.observers.TestSubscriber;
3529
import rx.schedulers.Schedulers;
3630

@@ -87,67 +81,6 @@ public void onNext(Long t) {
8781
assertEquals(499, ts.getOnNextEvents().get(499).intValue());
8882
}
8983

90-
@Test(expected = IllegalArgumentException.class)
91-
public void testFixBackpressureBufferNegativeCapacity() throws InterruptedException {
92-
Observable.empty().onBackpressureBuffer(-1);
93-
}
94-
95-
@Test(expected = IllegalArgumentException.class)
96-
public void testFixBackpressureBufferZeroCapacity() throws InterruptedException {
97-
Observable.empty().onBackpressureBuffer(-1);
98-
}
99-
100-
@Test(timeout = 500)
101-
public void testFixBackpressureBoundedBuffer() throws InterruptedException {
102-
final CountDownLatch l1 = new CountDownLatch(250);
103-
final CountDownLatch l2 = new CountDownLatch(500);
104-
final CountDownLatch l3 = new CountDownLatch(1);
105-
TestSubscriber<Long> ts = new TestSubscriber<Long>(new Observer<Long>() {
106-
107-
@Override
108-
public void onCompleted() {
109-
}
110-
111-
@Override
112-
public void onError(Throwable e) {
113-
}
114-
115-
@Override
116-
public void onNext(Long t) {
117-
l1.countDown();
118-
l2.countDown();
119-
}
120-
121-
});
122-
123-
ts.requestMore(500);
124-
125-
final ConnectableObservable<Long> flood =
126-
infinite.subscribeOn(Schedulers.computation())
127-
.publish();
128-
final ConnectableObservable<Long> batch =
129-
infinite.subscribeOn(Schedulers.computation())
130-
.onBackpressureBuffer(100, new Action0() {
131-
@Override
132-
public void call() {
133-
l3.countDown();
134-
}
135-
}).publish();
136-
Subscription s = batch.subscribe(ts);
137-
batch.connect(); // first controlled batch
138-
139-
l1.await();
140-
flood.connect(); // open flood
141-
l2.await(); // ts can only swallow 250 more
142-
l3.await(); // hold until it chokes
143-
144-
assertEquals(500, ts.getOnNextEvents().size());
145-
assertEquals(0, ts.getOnNextEvents().get(0).intValue());
146-
assertTrue(ts.getOnErrorEvents().get(0) instanceof MissingBackpressureException);
147-
assertTrue(s.isUnsubscribed());
148-
149-
}
150-
15184
static final Observable<Long> infinite = Observable.create(new OnSubscribe<Long>() {
15285

15386
@Override
@@ -159,5 +92,4 @@ public void call(Subscriber<? super Long> s) {
15992
}
16093

16194
});
162-
16395
}

0 commit comments

Comments
 (0)