Skip to content

Commit c548747

Browse files
Merge pull request ReactiveX#1907 from benjchristensen/onBackpressureBlock
Experimental: onBackpressureBlock
2 parents f46d7f9 + 181c0aa commit c548747

File tree

3 files changed

+462
-1
lines changed

3 files changed

+462
-1
lines changed

src/main/java/rx/Observable.java

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
import java.util.*;
1616
import java.util.concurrent.*;
1717

18+
import rx.annotations.Experimental;
1819
import rx.exceptions.*;
1920
import rx.functions.*;
2021
import rx.internal.operators.*;
2122
import rx.internal.util.ScalarSynchronousObservable;
2223
import rx.internal.util.UtilityFunctions;
23-
2424
import rx.observables.*;
2525
import rx.observers.SafeSubscriber;
2626
import rx.plugins.*;
@@ -182,6 +182,7 @@ public void call(Subscriber<? super R> o) {
182182
* @return the source Observable, transformed by the transformer function
183183
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
184184
*/
185+
@SuppressWarnings("unchecked")
185186
public <R> Observable<R> compose(Transformer<? super T, ? extends R> transformer) {
186187
return ((Transformer<T, R>) transformer).call(this);
187188
}
@@ -5054,6 +5055,47 @@ public final Observable<T> onBackpressureDrop() {
50545055
return lift(new OperatorOnBackpressureDrop<T>());
50555056
}
50565057

5058+
/**
5059+
* Instructs an Observable that is emitting items faster than its observer can consume them is to
5060+
* block the producer thread.
5061+
* <p>
5062+
* The producer side can emit up to {@code maxQueueLength} onNext elements without blocking, but the
5063+
* consumer side considers the amount its downstream requested through {@code Producer.request(n)}
5064+
* and doesn't emit more than requested even if more is available. For example, using
5065+
* {@code onBackpressureBlock(384).observeOn(Schedulers.io())} will not throw a MissingBackpressureException.
5066+
* <p>
5067+
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
5068+
* and doesn't propagate any backpressure requests from downstream.
5069+
*
5070+
* @param maxQueueLength the maximum number of items the producer can emit without blocking
5071+
* @return the source Observable modified to block {@code onNext} notifications on overflow
5072+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5073+
* @Experimental The behavior of this can change at any time.
5074+
*/
5075+
@Experimental
5076+
public final Observable<T> onBackpressureBlock(int maxQueueLength) {
5077+
return lift(new OperatorOnBackpressureBlock<T>(maxQueueLength));
5078+
}
5079+
/**
5080+
* Instructs an Observable that is emitting items faster than its observer can consume them is to
5081+
* block the producer thread if the number of undelivered onNext events reaches the system-wide ring buffer size.
5082+
* <p>
5083+
* The producer side can emit up to the system-wide ring buffer size onNext elements without blocking, but the
5084+
* consumer side considers the amount its downstream requested through {@code Producer.request(n)}
5085+
* and doesn't emit more than requested even if available.
5086+
* <p>
5087+
* Note that if the upstream Observable does support backpressure, this operator ignores that capability
5088+
* and doesn't propagate any backpressure requests from downstream.
5089+
*
5090+
* @return the source Observable modified to block {@code onNext} notifications on overflow
5091+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
5092+
* @Experimental The behavior of this can change at any time.
5093+
*/
5094+
@Experimental
5095+
public final Observable<T> onBackpressureBlock() {
5096+
return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE);
5097+
}
5098+
50575099
/**
50585100
* Instructs an Observable to pass control to another Observable rather than invoking
50595101
* {@link Observer#onError onError} if it encounters an error.
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.ArrayBlockingQueue;
20+
import java.util.concurrent.BlockingQueue;
21+
22+
import rx.Observable.Operator;
23+
import rx.Producer;
24+
import rx.Subscriber;
25+
26+
/**
27+
* Operator that blocks the producer thread in case a backpressure is needed.
28+
*/
29+
public class OperatorOnBackpressureBlock<T> implements Operator<T, T> {
30+
final int max;
31+
public OperatorOnBackpressureBlock(int max) {
32+
this.max = max;
33+
}
34+
@Override
35+
public Subscriber<? super T> call(Subscriber<? super T> child) {
36+
BlockingSubscriber<T> s = new BlockingSubscriber<T>(max, child);
37+
s.init();
38+
return s;
39+
}
40+
41+
static final class BlockingSubscriber<T> extends Subscriber<T> {
42+
final NotificationLite<T> nl = NotificationLite.instance();
43+
final BlockingQueue<Object> queue;
44+
final Subscriber<? super T> child;
45+
/** Guarded by this. */
46+
long requestedCount;
47+
/** Guarded by this. */
48+
boolean emitting;
49+
volatile boolean terminated;
50+
/** Set before terminated, read after terminated. */
51+
Throwable exception;
52+
public BlockingSubscriber(int max, Subscriber<? super T> child) {
53+
this.queue = new ArrayBlockingQueue<Object>(max);
54+
this.child = child;
55+
}
56+
void init() {
57+
child.add(this);
58+
child.setProducer(new Producer() {
59+
@Override
60+
public void request(long n) {
61+
synchronized (BlockingSubscriber.this) {
62+
if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) {
63+
requestedCount = Long.MAX_VALUE;
64+
} else {
65+
requestedCount += n;
66+
}
67+
}
68+
drain();
69+
}
70+
});
71+
}
72+
@Override
73+
public void onNext(T t) {
74+
try {
75+
queue.put(nl.next(t));
76+
drain();
77+
} catch (InterruptedException ex) {
78+
if (!isUnsubscribed()) {
79+
onError(ex);
80+
}
81+
}
82+
}
83+
@Override
84+
public void onError(Throwable e) {
85+
if (!terminated) {
86+
exception = e;
87+
terminated = true;
88+
drain();
89+
}
90+
}
91+
@Override
92+
public void onCompleted() {
93+
terminated = true;
94+
drain();
95+
}
96+
void drain() {
97+
long n;
98+
synchronized (this) {
99+
if (emitting) {
100+
return;
101+
}
102+
emitting = true;
103+
n = requestedCount;
104+
}
105+
boolean skipFinal = false;
106+
try {
107+
while (true) {
108+
int emitted = 0;
109+
while (n > 0) {
110+
Object o = queue.poll();
111+
if (o == null) {
112+
if (terminated) {
113+
if (exception != null) {
114+
child.onError(exception);
115+
} else {
116+
child.onCompleted();
117+
}
118+
return;
119+
}
120+
break;
121+
} else {
122+
child.onNext(nl.getValue(o));
123+
n--;
124+
emitted++;
125+
}
126+
}
127+
synchronized (this) {
128+
// if no backpressure below
129+
if (requestedCount == Long.MAX_VALUE) {
130+
// no new data arrived since the last poll
131+
if (queue.peek() == null) {
132+
skipFinal = true;
133+
emitting = false;
134+
return;
135+
}
136+
n = Long.MAX_VALUE;
137+
} else {
138+
if (emitted == 0) {
139+
skipFinal = true;
140+
emitting = false;
141+
return;
142+
}
143+
requestedCount -= emitted;
144+
n = requestedCount;
145+
}
146+
}
147+
}
148+
} finally {
149+
if (!skipFinal) {
150+
synchronized (this) {
151+
emitting = false;
152+
}
153+
}
154+
}
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)