Skip to content

Commit f1bd2a9

Browse files
Merge pull request #2820 from akarnokd/WindowWithSizeBackpressure
Backpressure for window(size)
2 parents 1f3ca5a + 68a356e commit f1bd2a9

File tree

3 files changed

+79
-25
lines changed

3 files changed

+79
-25
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8968,7 +8968,8 @@ public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observa
89688968
* <img width="640" height="400" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window3.png" alt="">
89698969
* <dl>
89708970
* <dt><b>Backpressure Support:</b></dt>
8971-
* <dd>This operator does not support backpressure as it uses {@code count} to control data flow.</dd>
8971+
* <dd>The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables
8972+
* but each of them will emit at most {@code count} elements.</dd>
89728973
* <dt><b>Scheduler:</b></dt>
89738974
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
89748975
* </dl>
@@ -8992,7 +8993,8 @@ public final Observable<Observable<T>> window(int count) {
89928993
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window4.png" alt="">
89938994
* <dl>
89948995
* <dt><b>Backpressure Support:</b></dt>
8995-
* <dd>This operator does not support backpressure as it uses {@code count} to control data flow.</dd>
8996+
* <dd>The operator has limited backpressure support. If {@code count} == {@code skip}, the operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables
8997+
* but each of them will emit at most {@code count} elements.</dd>
89968998
* <dt><b>Scheduler:</b></dt>
89978999
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>
89989000
* </dl>

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,14 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Iterator;
20-
import java.util.LinkedList;
21-
import java.util.List;
18+
import java.util.*;
2219

23-
import rx.Observable;
20+
import rx.*;
2421
import rx.Observable.Operator;
25-
import rx.Subscription;
22+
import rx.Observable;
23+
import rx.Observer;
2624
import rx.functions.Action0;
2725
import rx.subscriptions.Subscriptions;
28-
import rx.Observer;
29-
import rx.Subscriber;
3026

3127
/**
3228
* Creates windows of values into the source sequence with skip frequency and size bounds.
@@ -78,26 +74,36 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
7874
@Override
7975
public void call() {
8076
// if no window we unsubscribe up otherwise wait until window ends
81-
if(noWindow) {
77+
if (noWindow) {
8278
parentSubscription.unsubscribe();
8379
}
8480
}
8581

8682
}));
87-
}
88-
89-
@Override
90-
public void onStart() {
91-
// no backpressure as we are controlling data flow by window size
92-
request(Long.MAX_VALUE);
83+
child.setProducer(new Producer() {
84+
@Override
85+
public void request(long n) {
86+
if (n > 0) {
87+
long u = n * size;
88+
if (((u >>> 31) != 0) && (u / n != size)) {
89+
u = Long.MAX_VALUE;
90+
}
91+
requestMore(u);
92+
}
93+
}
94+
});
9395
}
9496

97+
void requestMore(long n) {
98+
request(n);
99+
}
100+
95101
@Override
96102
public void onNext(T t) {
97103
if (window == null) {
98104
noWindow = false;
99105
window = BufferUntilSubscriber.create();
100-
child.onNext(window);
106+
child.onNext(window);
101107
}
102108
window.onNext(t);
103109
if (++count % size == 0) {

src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java

Lines changed: 53 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,19 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertTrue;
18+
import static org.junit.Assert.*;
2019

21-
import java.util.ArrayList;
22-
import java.util.Arrays;
23-
import java.util.List;
20+
import java.util.*;
2421
import java.util.concurrent.TimeUnit;
2522
import java.util.concurrent.atomic.AtomicInteger;
2623

2724
import org.junit.Test;
2825

26+
import static org.mockito.Mockito.*;
27+
import rx.*;
2928
import rx.Observable;
30-
import rx.functions.Action1;
31-
import rx.functions.Func1;
29+
import rx.Observer;
30+
import rx.functions.*;
3231
import rx.observers.TestSubscriber;
3332
import rx.schedulers.Schedulers;
3433

@@ -198,5 +197,52 @@ private List<String> list(String... args) {
198197
}
199198
return list;
200199
}
200+
201+
@Test
202+
public void testBackpressureOuter() {
203+
Observable<Observable<Integer>> source = Observable.range(1, 10).window(3);
204+
205+
final List<Integer> list = new ArrayList<Integer>();
206+
207+
@SuppressWarnings("unchecked")
208+
final Observer<Integer> o = mock(Observer.class);
209+
210+
source.subscribe(new Subscriber<Observable<Integer>>() {
211+
@Override
212+
public void onStart() {
213+
request(1);
214+
}
215+
@Override
216+
public void onNext(Observable<Integer> t) {
217+
t.subscribe(new Observer<Integer>() {
218+
@Override
219+
public void onNext(Integer t) {
220+
list.add(t);
221+
}
222+
@Override
223+
public void onError(Throwable e) {
224+
o.onError(e);
225+
}
226+
@Override
227+
public void onCompleted() {
228+
o.onCompleted();
229+
}
230+
});
231+
}
232+
@Override
233+
public void onError(Throwable e) {
234+
o.onError(e);
235+
}
236+
@Override
237+
public void onCompleted() {
238+
o.onCompleted();
239+
}
240+
});
241+
242+
assertEquals(Arrays.asList(1, 2, 3), list);
243+
244+
verify(o, never()).onError(any(Throwable.class));
245+
verify(o, times(1)).onCompleted(); // 1 inner
246+
}
201247

202248
}

0 commit comments

Comments
 (0)