Skip to content

Commit 3dfae54

Browse files
Merge pull request #146 from benjchristensen/issue-87
Merge of Pull #125 for Issue #87 Operator TakeWhile
2 parents 23f04cc + a69f496 commit 3dfae54

File tree

5 files changed

+213
-22
lines changed

5 files changed

+213
-22
lines changed

language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/observable_tests.clj

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,6 @@
22
(import rx.Observable))
33

44
;; still need to get this wired up in build.gradle to run as tests
5-
; (-> (rx.Observable/toObservable [\"one\" \"two\" \"three\"]) (.take 2) (.subscribe (fn [arg] (println arg))))
5+
; (-> (rx.Observable/toObservable ["one" "two" "three"]) (.take 2) (.subscribe (fn [arg] (println arg))))
6+
7+
; (-> (rx.Observable/toObservable [1 2 3]) (.takeWhile (fn [x i] (< x 2))) (.subscribe (fn [arg] (println arg))))

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,22 @@ def class ObservableTests {
174174
verify(a, times(0)).received(3);
175175
}
176176

177+
@Test
178+
public void testTakeWhileViaGroovy() {
179+
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
180+
verify(a, times(1)).received(1);
181+
verify(a, times(1)).received(2);
182+
verify(a, times(0)).received(3);
183+
}
184+
185+
@Test
186+
public void testTakeWhileWithIndexViaGroovy() {
187+
Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
188+
verify(a, times(1)).received(1);
189+
verify(a, times(1)).received(2);
190+
verify(a, times(0)).received(3);
191+
}
192+
177193
@Test
178194
public void testToSortedList() {
179195
new TestFactory().getNumbers().toSortedList().subscribe({ result -> a.received(result)});

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1370,6 +1370,57 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
13701370
return _create(OperationTakeLast.takeLast(items, count));
13711371
}
13721372

1373+
/**
1374+
* Returns a specified number of contiguous values from the start of an observable sequence.
1375+
*
1376+
* @param items
1377+
* @param predicate a function to test each source element for a condition
1378+
* @return
1379+
*/
1380+
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
1381+
return create(OperationTake.takeWhile(items, predicate));
1382+
}
1383+
1384+
/**
1385+
* Returns a specified number of contiguous values from the start of an observable sequence.
1386+
*
1387+
* @param items
1388+
* @param predicate a function to test each source element for a condition
1389+
* @return
1390+
*/
1391+
public static <T> Observable<T> takeWhile(final Observable<T> items, Object predicate) {
1392+
final FuncN _f = Functions.from(predicate);
1393+
1394+
return takeWhile(items, new Func1<T, Boolean>() {
1395+
@Override
1396+
public Boolean call(T t) {
1397+
return (Boolean) _f.call(t);
1398+
}
1399+
});
1400+
}
1401+
1402+
/**
1403+
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
1404+
*
1405+
* @param items
1406+
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
1407+
* @return
1408+
*/
1409+
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
1410+
return create(OperationTake.takeWhileWithIndex(items, predicate));
1411+
}
1412+
1413+
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
1414+
final FuncN _f = Functions.from(predicate);
1415+
1416+
return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
1417+
@Override
1418+
public Boolean call(T t, Integer integer) {
1419+
return (Boolean) _f.call(t, integer);
1420+
}
1421+
}));
1422+
}
1423+
13731424
/**
13741425
* Returns an Observable that emits a single item, a list composed of all the items emitted by
13751426
* the source Observable.
@@ -2301,6 +2352,47 @@ public Observable<T> take(final int num) {
23012352
return take(this, num);
23022353
}
23032354

2355+
2356+
/**
2357+
* Returns an Observable that items emitted by the source Observable as long as a specified condition is true.
2358+
*
2359+
* @param predicate a function to test each source element for a condition
2360+
* @return
2361+
*/
2362+
public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
2363+
return takeWhile(this, predicate);
2364+
}
2365+
2366+
/**
2367+
* Returns a specified number of contiguous values from the start of an observable sequence.
2368+
*
2369+
* @param predicate a function to test each source element for a condition
2370+
* @return
2371+
*/
2372+
public Observable<T> takeWhile(final Object predicate) {
2373+
return takeWhile(this, predicate);
2374+
}
2375+
2376+
/**
2377+
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
2378+
*
2379+
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
2380+
* @return
2381+
*/
2382+
public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predicate) {
2383+
return takeWhileWithIndex(this, predicate);
2384+
}
2385+
2386+
/**
2387+
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
2388+
*
2389+
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
2390+
* @return
2391+
*/
2392+
public Observable<T> takeWhileWithIndex(final Object predicate) {
2393+
return takeWhileWithIndex(this, predicate);
2394+
}
2395+
23042396
/**
23052397
* Returns an Observable that emits the last <code>count</code> items emitted by the source
23062398
* Observable.

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 101 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -28,33 +28,75 @@
2828
import rx.Subscription;
2929
import rx.util.AtomicObservableSubscription;
3030
import rx.util.functions.Func1;
31+
import rx.util.functions.Func2;
3132

3233
/**
3334
* Returns a specified number of contiguous values from the start of an observable sequence.
34-
*
35-
* @param <T>
3635
*/
3736
public final class OperationTake {
3837

3938
/**
4039
* Returns a specified number of contiguous values from the start of an observable sequence.
41-
*
40+
*
4241
* @param items
4342
* @param num
4443
* @return
4544
*/
4645
public static <T> Func1<Observer<T>, Subscription> take(final Observable<T> items, final int num) {
47-
// wrap in a Watchbable so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
46+
return takeWhileWithIndex(items, OperationTake.<T>numPredicate(num));
47+
}
48+
49+
/**
50+
* Returns a specified number of contiguous values from the start of an observable sequence.
51+
*
52+
* @param items
53+
* @param predicate a function to test each source element for a condition
54+
* @return
55+
*/
56+
public static <T> Func1<Observer<T>, Subscription> takeWhile(final Observable<T> items, final Func1<T, Boolean> predicate) {
57+
return takeWhileWithIndex(items, OperationTake.<T>skipIndex(predicate));
58+
}
59+
60+
/**
61+
* Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values.
62+
*
63+
* @param items
64+
* @param predicate a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
65+
* @return
66+
*/
67+
public static <T> Func1<Observer<T>, Subscription> takeWhileWithIndex(final Observable<T> items, final Func2<T, Integer, Boolean> predicate) {
68+
// wrap in a Func so that if a chain is built up, then asynchronously subscribed to twice we will have 2 instances of Take<T> rather than 1 handing both, which is not thread-safe.
4869
return new Func1<Observer<T>, Subscription>() {
4970

5071
@Override
5172
public Subscription call(Observer<T> observer) {
52-
return new Take<T>(items, num).call(observer);
73+
return new TakeWhile<T>(items, predicate).call(observer);
5374
}
5475

5576
};
5677
}
5778

79+
private static <T> Func2<T, Integer, Boolean> numPredicate(final int num) {
80+
return new Func2<T, Integer, Boolean>() {
81+
82+
@Override
83+
public Boolean call(T input, Integer index) {
84+
return index < num;
85+
}
86+
87+
};
88+
}
89+
90+
private static <T> Func2<T, Integer, Boolean> skipIndex(final Func1<T, Boolean> underlying) {
91+
return new Func2<T, Integer, Boolean>() {
92+
@Override
93+
public Boolean call(T input, Integer index) {
94+
return underlying.call(input);
95+
}
96+
};
97+
}
98+
99+
58100
/**
59101
* This class is NOT thread-safe if invoked and referenced multiple times. In other words, don't subscribe to it multiple times from different threads.
60102
* <p>
@@ -63,29 +105,27 @@ public Subscription call(Observer<T> observer) {
63105
* This should all be fine as long as it's kept as a private class and a new instance created from static factory method above.
64106
* <p>
65107
* Note how the take() factory method above protects us from a single instance being exposed with the Observable wrapper handling the subscribe flow.
66-
*
108+
*
67109
* @param <T>
68110
*/
69-
private static class Take<T> implements Func1<Observer<T>, Subscription> {
70-
private final int num;
111+
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
112+
private final AtomicInteger counter = new AtomicInteger();
71113
private final Observable<T> items;
114+
private final Func2<T, Integer, Boolean> predicate;
72115
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
73116

74-
Take(final Observable<T> items, final int num) {
75-
this.num = num;
117+
private TakeWhile(Observable<T> items, Func2<T, Integer, Boolean> predicate) {
76118
this.items = items;
119+
this.predicate = predicate;
77120
}
78121

122+
123+
@Override
79124
public Subscription call(Observer<T> observer) {
80125
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
81126
}
82127

83-
/**
84-
* Used to subscribe to the 'items' Observable sequence and forward to the actualObserver up to 'num' count.
85-
*/
86128
private class ItemObserver implements Observer<T> {
87-
88-
private AtomicInteger counter = new AtomicInteger();
89129
private final Observer<T> observer;
90130

91131
public ItemObserver(Observer<T> observer) {
@@ -104,7 +144,7 @@ public void onError(Exception e) {
104144

105145
@Override
106146
public void onNext(T args) {
107-
if (counter.getAndIncrement() < num) {
147+
if (predicate.call(args, counter.getAndIncrement())) {
108148
observer.onNext(args);
109149
} else {
110150
// this will work if the sequence is asynchronous, it will have no effect on a synchronous observable
@@ -118,6 +158,48 @@ public void onNext(T args) {
118158

119159
public static class UnitTest {
120160

161+
162+
163+
@Test
164+
public void testTakeWhile1() {
165+
Observable<Integer> w = Observable.toObservable(1, 2, 3);
166+
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>() {
167+
@Override
168+
public Boolean call(Integer input) {
169+
return input < 3;
170+
}
171+
}));
172+
173+
@SuppressWarnings("unchecked")
174+
Observer<Integer> aObserver = mock(Observer.class);
175+
take.subscribe(aObserver);
176+
verify(aObserver, times(1)).onNext(1);
177+
verify(aObserver, times(1)).onNext(2);
178+
verify(aObserver, never()).onNext(3);
179+
verify(aObserver, never()).onError(any(Exception.class));
180+
verify(aObserver, times(1)).onCompleted();
181+
}
182+
183+
@Test
184+
public void testTakeWhile2() {
185+
Observable<String> w = Observable.toObservable("one", "two", "three");
186+
Observable<String> take = Observable.create(takeWhileWithIndex(w, new Func2<String, Integer, Boolean>() {
187+
@Override
188+
public Boolean call(String input, Integer index) {
189+
return index < 2;
190+
}
191+
}));
192+
193+
@SuppressWarnings("unchecked")
194+
Observer<String> aObserver = mock(Observer.class);
195+
take.subscribe(aObserver);
196+
verify(aObserver, times(1)).onNext("one");
197+
verify(aObserver, times(1)).onNext("two");
198+
verify(aObserver, never()).onNext("three");
199+
verify(aObserver, never()).onError(any(Exception.class));
200+
verify(aObserver, times(1)).onCompleted();
201+
}
202+
121203
@Test
122204
public void testTake1() {
123205
Observable<String> w = Observable.toObservable("one", "two", "three");

rxjava-core/src/main/java/rx/util/functions/Functions.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* <p>
2727
* Language support is provided via implementations of {@link FunctionLanguageAdaptor}.
2828
* <p>
29-
* This class will dynamically look for known language adaptors on the classpath at startup or new ones can be registered using {@link #registerLanguageAdaptor(Class, FunctionLanguageAdaptor)}.
29+
* This class will dynamically look for known language adaptors on the classpath at startup or new ones can be registered using {@link #registerLanguageAdaptor(Class[], FunctionLanguageAdaptor)}.
3030
*/
3131
public class Functions {
3232

@@ -81,7 +81,6 @@ public static Collection<FunctionLanguageAdaptor> getRegisteredLanguageAdaptors(
8181
* Utility method for determining the type of closure/function and executing it.
8282
*
8383
* @param function
84-
* @param args
8584
*/
8685
@SuppressWarnings({ "rawtypes" })
8786
public static FuncN from(final Object function) {

0 commit comments

Comments
 (0)