Skip to content

Commit dad2a8c

Browse files
committed
Operator DoTakeWhile
1 parent cf5ae70 commit dad2a8c

File tree

3 files changed

+220
-0
lines changed

3 files changed

+220
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7824,6 +7824,26 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
78247824
return lift(new OperatorTakeWhile<T>(predicate));
78257825
}
78267826

7827+
/**
7828+
* Returns an Observable that first emits items emitted by the source Observable,
7829+
* checks the specified condition after each item, and
7830+
* then completes as soon as this condition is not satisfied.
7831+
* <p>
7832+
* The difference between this operator and {@link #takeWhile(Func1)} is that here, the condition is evaluated <b>after</b>
7833+
* the item was emitted.
7834+
*
7835+
* @param predicate
7836+
* a function that evaluates an item emitted by the source Observable and returns a Boolean
7837+
* @return an Observable that first emits items emitted by the source Observable,
7838+
* checks the specified condition after each item, and
7839+
* then completes as soon as this condition is not satisfied.
7840+
* @see Observable#takeWhile(Func1)
7841+
*/
7842+
@Experimental
7843+
public final Observable<T> doTakeWhile(final Func1<? super T, Boolean> predicate) {
7844+
return lift(new OperatorDoTakeWhile<T>(predicate));
7845+
}
7846+
78277847
/**
78287848
* Returns an Observable that emits only the first item emitted by the source Observable during sequential
78297849
* time windows of a specified duration.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
import rx.annotations.Experimental;
21+
import rx.functions.Func1;
22+
23+
/**
24+
* Returns an Observable that emits items emitted by the source Observable until
25+
* the provided predicate returns false
26+
* <p>
27+
*/
28+
@Experimental
29+
public final class OperatorDoTakeWhile<T> implements Operator<T, T> {
30+
31+
private final Func1<? super T, Boolean> predicate;
32+
33+
public OperatorDoTakeWhile(final Func1<? super T, Boolean> predicate) {
34+
this.predicate = predicate;
35+
}
36+
37+
@Override
38+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
39+
Subscriber<T> parent = new Subscriber<T>() {
40+
41+
private boolean done = false;
42+
43+
@Override
44+
public void onNext(T args) {
45+
child.onNext(args);
46+
47+
boolean doContinue = false;
48+
try {
49+
doContinue = predicate.call(args);
50+
} catch (Throwable e) {
51+
done = true;
52+
child.onError(e);
53+
unsubscribe();
54+
return;
55+
}
56+
if (!doContinue) {
57+
done = true;
58+
child.onCompleted();
59+
unsubscribe();
60+
}
61+
}
62+
63+
@Override
64+
public void onCompleted() {
65+
if (!done) {
66+
child.onCompleted();
67+
}
68+
}
69+
70+
@Override
71+
public void onError(Throwable e) {
72+
if (!done) {
73+
child.onError(e);
74+
}
75+
}
76+
77+
};
78+
child.add(parent); // don't unsubscribe downstream
79+
80+
return parent;
81+
}
82+
83+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.*;
21+
22+
import org.junit.Test;
23+
24+
import rx.*;
25+
import rx.exceptions.TestException;
26+
import rx.functions.Func1;
27+
import rx.internal.util.UtilityFunctions;
28+
;
29+
30+
public class OperatorDoTakeWhileTest {
31+
@Test
32+
public void takeEmpty() {
33+
@SuppressWarnings("unchecked")
34+
Observer<Object> o = mock(Observer.class);
35+
36+
Observable.empty().doTakeWhile(UtilityFunctions.alwaysTrue()).subscribe(o);
37+
38+
verify(o, never()).onNext(any());
39+
verify(o, never()).onError(any(Throwable.class));
40+
verify(o).onCompleted();
41+
}
42+
@Test
43+
public void takeAll() {
44+
@SuppressWarnings("unchecked")
45+
Observer<Object> o = mock(Observer.class);
46+
47+
Observable.just(1, 2).doTakeWhile(UtilityFunctions.alwaysTrue()).subscribe(o);
48+
49+
verify(o).onNext(1);
50+
verify(o).onNext(2);
51+
verify(o, never()).onError(any(Throwable.class));
52+
verify(o).onCompleted();
53+
}
54+
@Test
55+
public void takeFirst() {
56+
@SuppressWarnings("unchecked")
57+
Observer<Object> o = mock(Observer.class);
58+
59+
Observable.just(1, 2).doTakeWhile(UtilityFunctions.alwaysFalse()).subscribe(o);
60+
61+
verify(o).onNext(1);
62+
verify(o, never()).onNext(2);
63+
verify(o, never()).onError(any(Throwable.class));
64+
verify(o).onCompleted();
65+
}
66+
@Test
67+
public void takeSome() {
68+
@SuppressWarnings("unchecked")
69+
Observer<Object> o = mock(Observer.class);
70+
71+
Observable.just(1, 2, 3).doTakeWhile(new Func1<Integer, Boolean>() {
72+
@Override
73+
public Boolean call(Integer t1) {
74+
return t1 < 2;
75+
}
76+
}).subscribe(o);
77+
78+
verify(o).onNext(1);
79+
verify(o).onNext(2);
80+
verify(o, never()).onNext(3);
81+
verify(o, never()).onError(any(Throwable.class));
82+
verify(o).onCompleted();
83+
}
84+
@Test
85+
public void functionThrows() {
86+
@SuppressWarnings("unchecked")
87+
Observer<Object> o = mock(Observer.class);
88+
89+
Observable.just(1, 2, 3).doTakeWhile(new Func1<Integer, Boolean>() {
90+
@Override
91+
public Boolean call(Integer t1) {
92+
throw new TestException("Forced failure");
93+
}
94+
}).subscribe(o);
95+
96+
verify(o).onNext(1);
97+
verify(o, never()).onNext(2);
98+
verify(o, never()).onNext(3);
99+
verify(o).onError(any(TestException.class));
100+
verify(o, never()).onCompleted();
101+
}
102+
@Test
103+
public void sourceThrows() {
104+
@SuppressWarnings("unchecked")
105+
Observer<Object> o = mock(Observer.class);
106+
107+
Observable.just(1)
108+
.concatWith(Observable.<Integer>error(new TestException()))
109+
.concatWith(Observable.just(2))
110+
.doTakeWhile(UtilityFunctions.alwaysTrue()).subscribe(o);
111+
112+
verify(o).onNext(1);
113+
verify(o, never()).onNext(2);
114+
verify(o).onError(any(TestException.class));
115+
verify(o, never()).onCompleted();
116+
}
117+
}

0 commit comments

Comments
 (0)