Skip to content

Commit d8b8fce

Browse files
Merge branch 'first-firstDefault' of git://github.com/jmhofer/RxJava into operator-first-merge
Conflicts: rxjava-core/src/test/java/rx/ObservableTests.java This merges pull request ReactiveX#357 Also aliased first with takeFirst to match takeLast.
2 parents d77d362 + 27f6042 commit d8b8fce

File tree

5 files changed

+373
-0
lines changed

5 files changed

+373
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx;
1717

18+
import static rx.util.functions.Functions.not;
19+
1820
import java.util.ArrayList;
1921
import java.util.Arrays;
2022
import java.util.Collection;
@@ -37,6 +39,7 @@
3739
import rx.operators.OperationDematerialize;
3840
import rx.operators.OperationFilter;
3941
import rx.operators.OperationFinally;
42+
import rx.operators.OperationFirstOrDefault;
4043
import rx.operators.OperationGroupBy;
4144
import rx.operators.OperationInterval;
4245
import rx.operators.OperationMap;
@@ -3322,6 +3325,63 @@ public Observable<T> skip(int num) {
33223325
return create(OperationSkip.skip(this, num));
33233326
}
33243327

3328+
/**
3329+
* Returns an Observable that emits only the very first item emitted by the source Observable.
3330+
*
3331+
* @return an Observable that emits only the very first item from the source, or none if the
3332+
* source Observable completes without emitting a single item.
3333+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
3334+
*/
3335+
public Observable<T> first() {
3336+
return take(1);
3337+
}
3338+
3339+
/**
3340+
* Returns an Observable that emits only the very first item emitted by the source Observable
3341+
* that satisfies a given condition.
3342+
*
3343+
* @param predicate
3344+
* The condition any source emitted item has to satisfy.
3345+
* @return an Observable that emits only the very first item satisfying the given condition from the source,
3346+
* or none if the source Observable completes without emitting a single matching item.
3347+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
3348+
*/
3349+
public Observable<T> first(Func1<? super T, Boolean> predicate) {
3350+
return skipWhile(not(predicate)).take(1);
3351+
}
3352+
3353+
/**
3354+
* Returns an Observable that emits only the very first item emitted by the source Observable, or
3355+
* a default value.
3356+
*
3357+
* @param defaultValue
3358+
* The default value to emit if the source Observable doesn't emit anything.
3359+
* @return an Observable that emits only the very first item from the source, or a default value
3360+
* if the source Observable completes without emitting a single item.
3361+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320%28v=vs.103%29.aspx">MSDN: Observable.FirstOrDefault</a>
3362+
*/
3363+
public Observable<T> firstOrDefault(T defaultValue) {
3364+
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
3365+
}
3366+
3367+
/**
3368+
* Returns an Observable that emits only the very first item emitted by the source Observable
3369+
* that satisfies a given condition, or a default value otherwise.
3370+
*
3371+
* @param predicate
3372+
* The condition any source emitted item has to satisfy.
3373+
* @param defaultValue
3374+
* The default value to emit if the source Observable doesn't emit anything that
3375+
* satisfies the given condition.
3376+
* @return an Observable that emits only the very first item from the source that satisfies the
3377+
* given condition, or a default value otherwise.
3378+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759%28v=vs.103%29.aspx">MSDN: Observable.FirstOrDefault</a>
3379+
*/
3380+
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
3381+
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
3382+
}
3383+
3384+
33253385
/**
33263386
* Returns an Observable that emits only the first <code>num</code> items emitted by the source
33273387
* Observable.
@@ -3374,6 +3434,33 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
33743434
return create(OperationTakeWhile.takeWhileWithIndex(this, predicate));
33753435
}
33763436

3437+
/**
3438+
* Returns an Observable that emits only the very first item emitted by the source Observable.
3439+
*
3440+
* @return an Observable that emits only the very first item from the source, or none if the
3441+
* source Observable completes without emitting a single item.
3442+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
3443+
* @see {@link #first()}
3444+
*/
3445+
public Observable<T> takeFirst() {
3446+
return first();
3447+
}
3448+
3449+
/**
3450+
* Returns an Observable that emits only the very first item emitted by the source Observable
3451+
* that satisfies a given condition.
3452+
*
3453+
* @param predicate
3454+
* The condition any source emitted item has to satisfy.
3455+
* @return an Observable that emits only the very first item satisfying the given condition from the source,
3456+
* or none if the source Observable completes without emitting a single matching item.
3457+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
3458+
* @see {@link #first(Func1)}
3459+
*/
3460+
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
3461+
return first(predicate);
3462+
}
3463+
33773464
/**
33783465
* Returns an Observable that emits only the last <code>count</code> items emitted by the source
33793466
* Observable.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.MockitoAnnotations.initMocks;
21+
import static rx.Observable.create;
22+
import static rx.Observable.empty;
23+
import static rx.Observable.from;
24+
import static rx.util.functions.Functions.alwaysTrue;
25+
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.Mock;
31+
32+
import rx.Observable;
33+
import rx.Observable.OnSubscribeFunc;
34+
import rx.Observer;
35+
import rx.Subscription;
36+
import rx.subscriptions.Subscriptions;
37+
import rx.util.functions.Action0;
38+
import rx.util.functions.Func1;
39+
40+
/**
41+
* Returns an Observable that emits the first item emitted by the source
42+
* Observable, or a default value if the source emits nothing.
43+
*/
44+
public final class OperationFirstOrDefault {
45+
46+
/**
47+
* Returns an Observable that emits the first item emitted by the source
48+
* Observable that satisfies the given condition,
49+
* or a default value if the source emits no items that satisfy the given condition.
50+
*
51+
* @param source
52+
* The source Observable to emit the first item for.
53+
* @param predicate
54+
* The condition the emitted source items have to satisfy.
55+
* @param defaultValue
56+
* The default value to use whenever the source Observable doesn't emit anything.
57+
* @return A subscription function for creating the target Observable.
58+
*/
59+
public static <T> OnSubscribeFunc<T> firstOrDefault(Observable<? extends T> source, Func1<? super T, Boolean> predicate, T defaultValue) {
60+
return new FirstOrElse<T>(source, predicate, defaultValue);
61+
}
62+
63+
/**
64+
* Returns an Observable that emits the first item emitted by the source
65+
* Observable, or a default value if the source emits nothing.
66+
*
67+
* @param source
68+
* The source Observable to emit the first item for.
69+
* @param defaultValue
70+
* The default value to use whenever the source Observable doesn't emit anything.
71+
* @return A subscription function for creating the target Observable.
72+
*/
73+
public static <T> OnSubscribeFunc<T> firstOrDefault(Observable<? extends T> source, T defaultValue) {
74+
return new FirstOrElse<T>(source, alwaysTrue(), defaultValue);
75+
}
76+
77+
private static class FirstOrElse<T> implements OnSubscribeFunc<T> {
78+
private final Observable<? extends T> source;
79+
private final Func1<? super T, Boolean> predicate;
80+
private final T defaultValue;
81+
82+
private FirstOrElse(Observable<? extends T> source, Func1<? super T, Boolean> predicate, T defaultValue) {
83+
this.source = source;
84+
this.defaultValue = defaultValue;
85+
this.predicate = predicate;
86+
}
87+
88+
@Override
89+
public Subscription onSubscribe(final Observer<? super T> observer) {
90+
final Subscription sourceSub = source.subscribe(new Observer<T>() {
91+
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);
92+
93+
@Override
94+
public void onCompleted() {
95+
if (!hasEmitted.get()) {
96+
observer.onNext(defaultValue);
97+
observer.onCompleted();
98+
}
99+
}
100+
101+
@Override
102+
public void onError(Throwable e) {
103+
observer.onError(e);
104+
}
105+
106+
@Override
107+
public void onNext(T next) {
108+
try {
109+
if (!hasEmitted.get() && predicate.call(next)) {
110+
hasEmitted.set(true);
111+
observer.onNext(next);
112+
observer.onCompleted();
113+
}
114+
} catch (Throwable t) {
115+
// may happen within the predicate call (user code)
116+
observer.onError(t);
117+
}
118+
}
119+
});
120+
121+
return Subscriptions.create(new Action0() {
122+
@Override
123+
public void call() {
124+
sourceSub.unsubscribe();
125+
}
126+
});
127+
}
128+
}
129+
130+
public static class UnitTest {
131+
@Mock
132+
Observer<? super String> w;
133+
134+
private static final Func1<String, Boolean> IS_D = new Func1<String, Boolean>() {
135+
@Override
136+
public Boolean call(String value) {
137+
return "d".equals(value);
138+
}
139+
};
140+
141+
@Before
142+
public void before() {
143+
initMocks(this);
144+
}
145+
146+
@Test
147+
public void testFirstOrElseOfNone() {
148+
Observable<String> src = empty();
149+
create(firstOrDefault(src, "default")).subscribe(w);
150+
151+
verify(w, times(1)).onNext(anyString());
152+
verify(w, times(1)).onNext("default");
153+
verify(w, never()).onError(any(Throwable.class));
154+
verify(w, times(1)).onCompleted();
155+
}
156+
157+
@Test
158+
public void testFirstOrElseOfSome() {
159+
Observable<String> src = from("a", "b", "c");
160+
create(firstOrDefault(src, "default")).subscribe(w);
161+
162+
verify(w, times(1)).onNext(anyString());
163+
verify(w, times(1)).onNext("a");
164+
verify(w, never()).onError(any(Throwable.class));
165+
verify(w, times(1)).onCompleted();
166+
}
167+
168+
@Test
169+
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() {
170+
Observable<String> src = from("a", "b", "c");
171+
create(firstOrDefault(src, IS_D, "default")).subscribe(w);
172+
173+
verify(w, times(1)).onNext(anyString());
174+
verify(w, times(1)).onNext("default");
175+
verify(w, never()).onError(any(Throwable.class));
176+
verify(w, times(1)).onCompleted();
177+
}
178+
179+
@Test
180+
public void testFirstOrElseWithPredicateOfSome() {
181+
Observable<String> src = from("a", "b", "c", "d", "e", "f");
182+
create(firstOrDefault(src, IS_D, "default")).subscribe(w);
183+
184+
verify(w, times(1)).onNext(anyString());
185+
verify(w, times(1)).onNext("d");
186+
verify(w, never()).onError(any(Throwable.class));
187+
verify(w, times(1)).onCompleted();
188+
}
189+
}
190+
}

rxjava-core/src/main/java/rx/util/functions/Functions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,16 @@ public Void call(Object... args) {
313313
};
314314
}
315315

316+
/**
317+
* Constructs a predicate that returns true for each input that the source
318+
* predicate returns false for and vice versa.
319+
*
320+
* @param predicate The source predicate to negate.
321+
*/
322+
public static <T> Func1<T, Boolean> not(Func1<? super T, Boolean> predicate) {
323+
return new Not<T>(predicate);
324+
}
325+
316326
public static <T> Func1<? super T, Boolean> alwaysTrue() {
317327
return AlwaysTrue.INSTANCE;
318328
}
@@ -334,4 +344,5 @@ public Boolean call(Object o) {
334344
return true;
335345
}
336346
}
347+
337348
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.util.functions;
17+
18+
/**
19+
* Implements the negation of a predicate.
20+
*
21+
* @param <T> The type of the single input parameter.
22+
*/
23+
public class Not<T> implements Func1<T, Boolean> {
24+
private final Func1<? super T, Boolean> predicate;
25+
26+
/**
27+
* Constructs a predicate that returns true for each input that the source
28+
* predicate returns false for and vice versa.
29+
*
30+
* @param predicate The source predicate to negate.
31+
*/
32+
public Not(Func1<? super T, Boolean> predicate) {
33+
this.predicate = predicate;
34+
}
35+
36+
@Override
37+
public Boolean call(T param) {
38+
return !predicate.call(param);
39+
}
40+
}

0 commit comments

Comments
 (0)