Skip to content

Commit e0e630e

Browse files
committed
Changed predicate to stopPredicate: stops on becoming true.
1 parent 9fa580c commit e0e630e

File tree

3 files changed

+16
-16
lines changed

3 files changed

+16
-16
lines changed

src/main/java/rx/Observable.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7827,21 +7827,21 @@ public final Observable<T> takeWhile(final Func1<? super T, Boolean> predicate)
78277827
/**
78287828
* Returns an Observable that first emits items emitted by the source Observable,
78297829
* checks the specified condition after each item, and
7830-
* then completes as soon as this condition is not satisfied.
7830+
* then completes if the condition is satisfied.
78317831
* <p>
78327832
* The difference between this operator and {@link #takeWhile(Func1)} is that here, the condition is evaluated <b>after</b>
78337833
* the item was emitted.
78347834
*
7835-
* @param predicate
7835+
* @param stopPredicate
78367836
* a function that evaluates an item emitted by the source Observable and returns a Boolean
78377837
* @return an Observable that first emits items emitted by the source Observable,
78387838
* checks the specified condition after each item, and
7839-
* then completes as soon as this condition is not satisfied.
7839+
* then completes if the condition is satisfied.
78407840
* @see Observable#takeWhile(Func1)
78417841
*/
78427842
@Experimental
7843-
public final Observable<T> takeUntil(final Func1<? super T, Boolean> predicate) {
7844-
return lift(new OperatorTakeUntilPredicate<T>(predicate));
7843+
public final Observable<T> takeUntil(final Func1<? super T, Boolean> stopPredicate) {
7844+
return lift(new OperatorTakeUntilPredicate<T>(stopPredicate));
78457845
}
78467846

78477847
/**

src/main/java/rx/internal/operators/OperatorTakeUntilPredicate.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,16 @@ private ParentSubscriber(Subscriber<? super T> child) {
4040
public void onNext(T args) {
4141
child.onNext(args);
4242

43-
boolean doContinue = false;
43+
boolean stop = false;
4444
try {
45-
doContinue = predicate.call(args);
45+
stop = stopPredicate.call(args);
4646
} catch (Throwable e) {
4747
done = true;
4848
child.onError(e);
4949
unsubscribe();
5050
return;
5151
}
52-
if (!doContinue) {
52+
if (stop) {
5353
done = true;
5454
child.onCompleted();
5555
unsubscribe();
@@ -74,10 +74,10 @@ void downstreamRequest(long n) {
7474
}
7575
}
7676

77-
private final Func1<? super T, Boolean> predicate;
77+
private final Func1<? super T, Boolean> stopPredicate;
7878

79-
public OperatorTakeUntilPredicate(final Func1<? super T, Boolean> predicate) {
80-
this.predicate = predicate;
79+
public OperatorTakeUntilPredicate(final Func1<? super T, Boolean> stopPredicate) {
80+
this.stopPredicate = stopPredicate;
8181
}
8282

8383
@Override

src/test/java/rx/internal/operators/OperatorTakeUntilPredicateTest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void takeAll() {
4747
@SuppressWarnings("unchecked")
4848
Observer<Object> o = mock(Observer.class);
4949

50-
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
50+
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
5151

5252
verify(o).onNext(1);
5353
verify(o).onNext(2);
@@ -59,7 +59,7 @@ public void takeFirst() {
5959
@SuppressWarnings("unchecked")
6060
Observer<Object> o = mock(Observer.class);
6161

62-
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
62+
Observable.just(1, 2).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
6363

6464
verify(o).onNext(1);
6565
verify(o, never()).onNext(2);
@@ -74,7 +74,7 @@ public void takeSome() {
7474
Observable.just(1, 2, 3).takeUntil(new Func1<Integer, Boolean>() {
7575
@Override
7676
public Boolean call(Integer t1) {
77-
return t1 < 2;
77+
return t1 == 2;
7878
}
7979
}).subscribe(o);
8080

@@ -110,7 +110,7 @@ public void sourceThrows() {
110110
Observable.just(1)
111111
.concatWith(Observable.<Integer>error(new TestException()))
112112
.concatWith(Observable.just(2))
113-
.takeUntil(UtilityFunctions.alwaysTrue()).subscribe(o);
113+
.takeUntil(UtilityFunctions.alwaysFalse()).subscribe(o);
114114

115115
verify(o).onNext(1);
116116
verify(o, never()).onNext(2);
@@ -126,7 +126,7 @@ public void onStart() {
126126
}
127127
};
128128

129-
Observable.range(1, 1000).takeUntil(UtilityFunctions.alwaysTrue()).subscribe(ts);
129+
Observable.range(1, 1000).takeUntil(UtilityFunctions.alwaysFalse()).subscribe(ts);
130130

131131
ts.assertNoErrors();
132132
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));

0 commit comments

Comments
 (0)