Skip to content

Commit 1f08a67

Browse files
Merge pull request #2493 from akarnokd/OperatorDoTakeWhile
Operator TakeUntil with predicate
2 parents e901ffa + e0e630e commit 1f08a67

File tree

3 files changed

+252
-0
lines changed

3 files changed

+252
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7854,6 +7854,26 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
78547854
return lift(new OperatorTakeWhile<T>(predicate));
78557855
}
78567856

7857+
/**
7858+
* Returns an Observable that first emits items emitted by the source Observable,
7859+
* checks the specified condition after each item, and
7860+
* then completes if the condition is satisfied.
7861+
* <p>
7862+
* The difference between this operator and {@link #takeWhile(Func1)} is that here, the condition is evaluated <b>after</b>
7863+
* the item was emitted.
7864+
*
7865+
* @param stopPredicate
7866+
* a function that evaluates an item emitted by the source Observable and returns a Boolean
7867+
* @return an Observable that first emits items emitted by the source Observable,
7868+
* checks the specified condition after each item, and
7869+
* then completes if the condition is satisfied.
7870+
* @see Observable#takeWhile(Func1)
7871+
*/
7872+
@Experimental
7873+
public final Observable<T> takeUntil(final Func1<? super T, Boolean> stopPredicate) {
7874+
return lift(new OperatorTakeUntilPredicate<T>(stopPredicate));
7875+
}
7876+
78577877
/**
78587878
* Returns an Observable that emits only the first item emitted by the source Observable during sequential
78597879
* time windows of a specified duration.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.*;
20+
import rx.annotations.Experimental;
21+
import rx.functions.Func1;
22+
23+
/**
24+
* Returns an Observable that emits items emitted by the source Observable until
25+
* the provided predicate returns false
26+
* <p>
27+
*/
28+
@Experimental
29+
public final class OperatorTakeUntilPredicate<T> implements Operator<T, T> {
30+
/** Subscriber returned to the upstream. */
31+
private final class ParentSubscriber extends Subscriber<T> {
32+
private final Subscriber<? super T> child;
33+
private boolean done = false;
34+
35+
private ParentSubscriber(Subscriber<? super T> child) {
36+
this.child = child;
37+
}
38+
39+
@Override
40+
public void onNext(T args) {
41+
child.onNext(args);
42+
43+
boolean stop = false;
44+
try {
45+
stop = stopPredicate.call(args);
46+
} catch (Throwable e) {
47+
done = true;
48+
child.onError(e);
49+
unsubscribe();
50+
return;
51+
}
52+
if (stop) {
53+
done = true;
54+
child.onCompleted();
55+
unsubscribe();
56+
}
57+
}
58+
59+
@Override
60+
public void onCompleted() {
61+
if (!done) {
62+
child.onCompleted();
63+
}
64+
}
65+
66+
@Override
67+
public void onError(Throwable e) {
68+
if (!done) {
69+
child.onError(e);
70+
}
71+
}
72+
void downstreamRequest(long n) {
73+
request(n);
74+
}
75+
}
76+
77+
private final Func1<? super T, Boolean> stopPredicate;
78+
79+
public OperatorTakeUntilPredicate(final Func1<? super T, Boolean> stopPredicate) {
80+
this.stopPredicate = stopPredicate;
81+
}
82+
83+
@Override
84+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
85+
final ParentSubscriber parent = new ParentSubscriber(child);
86+
child.add(parent); // don't unsubscribe downstream
87+
child.setProducer(new Producer() {
88+
@Override
89+
public void request(long n) {
90+
parent.downstreamRequest(n);
91+
}
92+
});
93+
94+
return parent;
95+
}
96+
97+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.util.Arrays;
23+
24+
import org.junit.*;
25+
26+
import rx.*;
27+
import rx.exceptions.TestException;
28+
import rx.functions.Func1;
29+
import rx.internal.util.UtilityFunctions;
30+
import rx.observers.TestSubscriber;
31+
;
32+
33+
public class OperatorTakeUntilPredicateTest {
34+
@Test
35+
public void takeEmpty() {
36+
@SuppressWarnings("unchecked")
37+
Observer<Object> o = mock(Observer.class);
38+
39+
Observable.empty().takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
40+
41+
verify(o, never()).onNext(any());
42+
verify(o, never()).onError(any(Throwable.class));
43+
verify(o).onCompleted();
44+
}
45+
@Test
46+
public void takeAll() {
47+
@SuppressWarnings("unchecked")
48+
Observer<Object> o = mock(Observer.class);
49+
50+
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
51+
52+
verify(o).onNext(1);
53+
verify(o).onNext(2);
54+
verify(o, never()).onError(any(Throwable.class));
55+
verify(o).onCompleted();
56+
}
57+
@Test
58+
public void takeFirst() {
59+
@SuppressWarnings("unchecked")
60+
Observer<Object> o = mock(Observer.class);
61+
62+
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
63+
64+
verify(o).onNext(1);
65+
verify(o, never()).onNext(2);
66+
verify(o, never()).onError(any(Throwable.class));
67+
verify(o).onCompleted();
68+
}
69+
@Test
70+
public void takeSome() {
71+
@SuppressWarnings("unchecked")
72+
Observer<Object> o = mock(Observer.class);
73+
74+
Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
75+
@Override
76+
public Boolean call(Integer t1) {
77+
return t1 == 2;
78+
}
79+
}).subscribe(o);
80+
81+
verify(o).onNext(1);
82+
verify(o).onNext(2);
83+
verify(o, never()).onNext(3);
84+
verify(o, never()).onError(any(Throwable.class));
85+
verify(o).onCompleted();
86+
}
87+
@Test
88+
public void functionThrows() {
89+
@SuppressWarnings("unchecked")
90+
Observer<Object> o = mock(Observer.class);
91+
92+
Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
93+
@Override
94+
public Boolean call(Integer t1) {
95+
throw new TestException("Forced failure");
96+
}
97+
}).subscribe(o);
98+
99+
verify(o).onNext(1);
100+
verify(o, never()).onNext(2);
101+
verify(o, never()).onNext(3);
102+
verify(o).onError(any(TestException.class));
103+
verify(o, never()).onCompleted();
104+
}
105+
@Test
106+
public void sourceThrows() {
107+
@SuppressWarnings("unchecked")
108+
Observer<Object> o = mock(Observer.class);
109+
110+
Observable.just(1)
111+
.concatWith(Observable.<Integer>error(new TestException()))
112+
.concatWith(Observable.just(2))
113+
.takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
114+
115+
verify(o).onNext(1);
116+
verify(o, never()).onNext(2);
117+
verify(o).onError(any(TestException.class));
118+
verify(o, never()).onCompleted();
119+
}
120+
@Test
121+
public void backpressure() {
122+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
123+
@Override
124+
public void onStart() {
125+
request(5);
126+
}
127+
};
128+
129+
Observable.range(1, 1000).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(ts);
130+
131+
ts.assertNoErrors();
132+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
133+
Assert.assertEquals(0, ts.getOnCompletedEvents().size());
134+
}
135+
}

0 commit comments

Comments
 (0)