Skip to content

Commit 9fa580c

Browse files
committed
Changed operator names, fixed backpressure.
1 parent dad2a8c commit 9fa580c

File tree

4 files changed

+125
-93
lines changed

4 files changed

+125
-93
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7840,8 +7840,8 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
78407840
* @see Observable#takeWhile(Func1)
78417841
*/
78427842
@Experimental
7843-
public final Observable<T> doTakeWhile(final Func1<? super T, Boolean> predicate) {
7844-
return lift(new OperatorDoTakeWhile<T>(predicate));
7843+
public final Observable<T> takeUntil(final Func1<? super T, Boolean> predicate) {
7844+
return lift(new OperatorTakeUntilPredicate<T>(predicate));
78457845
}
78467846

78477847
/**

src/main/java/rx/internal/operators/OperatorDoTakeWhile.java

Lines changed: 0 additions & 83 deletions
This file was deleted.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.*;
20+
import rx.annotations.Experimental;
21+
import rx.functions.Func1;
22+
23+
/**
24+
* Returns an Observable that emits items emitted by the source Observable until
25+
* the provided predicate returns false
26+
* <p>
27+
*/
28+
@Experimental
29+
public final class OperatorTakeUntilPredicate<T> implements Operator<T, T> {
30+
/** Subscriber returned to the upstream. */
31+
private final class ParentSubscriber extends Subscriber<T> {
32+
private final Subscriber<? super T> child;
33+
private boolean done = false;
34+
35+
private ParentSubscriber(Subscriber<? super T> child) {
36+
this.child = child;
37+
}
38+
39+
@Override
40+
public void onNext(T args) {
41+
child.onNext(args);
42+
43+
boolean doContinue = false;
44+
try {
45+
doContinue = predicate.call(args);
46+
} catch (Throwable e) {
47+
done = true;
48+
child.onError(e);
49+
unsubscribe();
50+
return;
51+
}
52+
if (!doContinue) {
53+
done = true;
54+
child.onCompleted();
55+
unsubscribe();
56+
}
57+
}
58+
59+
@Override
60+
public void onCompleted() {
61+
if (!done) {
62+
child.onCompleted();
63+
}
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
if (!done) {
69+
child.onError(e);
70+
}
71+
}
72+
void downstreamRequest(long n) {
73+
request(n);
74+
}
75+
}
76+
77+
private final Func1<? super T, Boolean> predicate;
78+
79+
public OperatorTakeUntilPredicate(final Func1<? super T, Boolean> predicate) {
80+
this.predicate = predicate;
81+
}
82+
83+
@Override
84+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
85+
final ParentSubscriber parent = new ParentSubscriber(child);
86+
child.add(parent); // don't unsubscribe downstream
87+
child.setProducer(new Producer() {
88+
@Override
89+
public void request(long n) {
90+
parent.downstreamRequest(n);
91+
}
92+
});
93+
94+
return parent;
95+
}
96+
97+
}

src/test/java/rx/internal/operators/OperatorDoTakeWhileTest.java renamed to src/test/java/rx/internal/operators/OperatorTakeUntilPredicateTest.java

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,24 @@
1919
import static org.mockito.Matchers.any;
2020
import static org.mockito.Mockito.*;
2121

22-
import org.junit.Test;
22+
import java.util.Arrays;
23+
24+
import org.junit.*;
2325

2426
import rx.*;
2527
import rx.exceptions.TestException;
2628
import rx.functions.Func1;
2729
import rx.internal.util.UtilityFunctions;
30+
import rx.observers.TestSubscriber;
2831
;
2932

30-
public class OperatorDoTakeWhileTest {
33+
public class OperatorTakeUntilPredicateTest {
3134
@Test
3235
public void takeEmpty() {
3336
@SuppressWarnings("unchecked")
3437
Observer<Object> o = mock(Observer.class);
3538

36-
Observable.empty().doTakeWhile(UtilityFunctions.alwaysTrue()).subscribe(o);
39+
Observable.empty().takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
3740

3841
verify(o, never()).onNext(any());
3942
verify(o, never()).onError(any(Throwable.class));
@@ -44,7 +47,7 @@ public void takeAll() {
4447
@SuppressWarnings("unchecked")
4548
Observer<Object> o = mock(Observer.class);
4649

47-
Observable.just(1, 2).doTakeWhile(UtilityFunctions.alwaysTrue()).subscribe(o);
50+
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
4851

4952
verify(o).onNext(1);
5053
verify(o).onNext(2);
@@ -56,7 +59,7 @@ public void takeFirst() {
5659
@SuppressWarnings("unchecked")
5760
Observer<Object> o = mock(Observer.class);
5861

59-
Observable.just(1, 2).doTakeWhile(UtilityFunctions.alwaysFalse()).subscribe(o);
62+
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
6063

6164
verify(o).onNext(1);
6265
verify(o, never()).onNext(2);
@@ -68,7 +71,7 @@ public void takeSome() {
6871
@SuppressWarnings("unchecked")
6972
Observer<Object> o = mock(Observer.class);
7073

71-
Observable.just(1, 2, 3).doTakeWhile(new Func1<Integer, Boolean>() {
74+
Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
7275
@Override
7376
public Boolean call(Integer t1) {
7477
return t1 < 2;
@@ -86,7 +89,7 @@ public void functionThrows() {
8689
@SuppressWarnings("unchecked")
8790
Observer<Object> o = mock(Observer.class);
8891

89-
Observable.just(1, 2, 3).doTakeWhile(new Func1<Integer, Boolean>() {
92+
Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
9093
@Override
9194
public Boolean call(Integer t1) {
9295
throw new TestException("Forced failure");
@@ -107,11 +110,26 @@ public void sourceThrows() {
107110
Observable.just(1)
108111
.concatWith(Observable.<Integer>error(new TestException()))
109112
.concatWith(Observable.just(2))
110-
.doTakeWhile(UtilityFunctions.alwaysTrue()).subscribe(o);
113+
.takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
111114

112115
verify(o).onNext(1);
113116
verify(o, never()).onNext(2);
114117
verify(o).onError(any(TestException.class));
115118
verify(o, never()).onCompleted();
116119
}
120+
@Test
121+
public void backpressure() {
122+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
123+
@Override
124+
public void onStart() {
125+
request(5);
126+
}
127+
};
128+
129+
Observable.range(1, 1000).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(ts);
130+
131+
ts.assertNoErrors();
132+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
133+
Assert.assertEquals(0, ts.getOnCompletedEvents().size());
134+
}
117135
}

0 commit comments

Comments
 (0)