Skip to content

Commit 4452518

Browse files
author
jmhofer
committed
Added FirstOrDefault operation
1 parent bd38371 commit 4452518

File tree

2 files changed

+226
-3
lines changed

2 files changed

+226
-3
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import rx.operators.OperationDematerialize;
3838
import rx.operators.OperationFilter;
3939
import rx.operators.OperationFinally;
40+
import rx.operators.OperationFirstOrDefault;
4041
import rx.operators.OperationGroupBy;
4142
import rx.operators.OperationMap;
4243
import rx.operators.OperationMaterialize;
@@ -2288,15 +2289,47 @@ public Observable<T> first() {
22882289
}
22892290

22902291
/**
2291-
* Returns an Observable that emits only the very first item emitted by the source Observable.
2292-
* @return an Observable that emits only the very first item from the source, or none if the
2293-
* source Observable completes without emitting a single item.
2292+
* Returns an Observable that emits only the very first item emitted by the source Observable
2293+
* that satisfies a given condition.
2294+
* @param predicate
2295+
* The condition any source emitted item has to satisfy.
2296+
* @return an Observable that emits only the very first item satisfying the given condition from the source,
2297+
* or none if the source Observable completes without emitting a single matching item.
22942298
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
22952299
*/
22962300
public Observable<T> first(Func1<? super T, Boolean> predicate) {
22972301
return skipWhile(not(predicate)).take(1);
22982302
}
22992303

2304+
/**
2305+
* Returns an Observable that emits only the very first item emitted by the source Observable, or
2306+
* a default value.
2307+
* @param defaultValue
2308+
* The default value to emit if the source Observable doesn't emit anything.
2309+
* @return an Observable that emits only the very first item from the source, or a default value
2310+
* if the source Observable completes without emitting a single item.
2311+
* @see <a href="">MSDN: Observable.FirstOrDefault</a>
2312+
*/
2313+
public Observable<T> firstOrDefault(T defaultValue) {
2314+
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
2315+
}
2316+
2317+
/**
2318+
* Returns an Observable that emits only the very first item emitted by the source Observable
2319+
* that satisfies a given condition, or a default value otherwise.
2320+
* @param predicate
2321+
* The condition any source emitted item has to satisfy.
2322+
* @param defaultValue
2323+
* The default value to emit if the source Observable doesn't emit anything that
2324+
* satisfies the given condition.
2325+
* @return an Observable that emits only the very first item from the source that satisfies the
2326+
* given condition, or a default value otherwise.
2327+
* @see <a href="">MSDN: Observable.FirstOrDefault</a>
2328+
*/
2329+
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
2330+
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
2331+
}
2332+
23002333
/**
23012334
* Returns an Observable that emits items emitted by the source Observable so long as a
23022335
* specified condition is true.
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.MockitoAnnotations.initMocks;
21+
import static rx.Observable.create;
22+
import static rx.Observable.empty;
23+
import static rx.Observable.from;
24+
import static rx.util.functions.Functions.alwaysTrue;
25+
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.Mock;
31+
32+
import rx.Observable;
33+
import rx.Observable.OnSubscribeFunc;
34+
import rx.Observer;
35+
import rx.Subscription;
36+
import rx.subscriptions.Subscriptions;
37+
import rx.util.functions.Action0;
38+
import rx.util.functions.Func1;
39+
40+
/**
41+
* Returns an Observable that emits the first item emitted by the source
42+
* Observable, or a default value if the source emits nothing.
43+
*/
44+
public final class OperationFirstOrDefault {
45+
46+
/**
47+
* Returns an Observable that emits the first item emitted by the source
48+
* Observable that satisfies the given condition,
49+
* or a default value if the source emits no items that satisfy the given condition.
50+
*
51+
* @param source
52+
* The source Observable to emit the first item for.
53+
* @param predicate
54+
* The condition the emitted source items have to satisfy.
55+
* @param defaultValue
56+
* The default value to use whenever the source Observable doesn't emit anything.
57+
* @return A subscription function for creating the target Observable.
58+
*/
59+
public static <T> OnSubscribeFunc<T> firstOrDefault(Observable<? extends T> source, Func1<? super T, Boolean> predicate, T defaultValue) {
60+
return new FirstOrElse<T>(source, predicate, defaultValue);
61+
}
62+
63+
/**
64+
* Returns an Observable that emits the first item emitted by the source
65+
* Observable, or a default value if the source emits nothing.
66+
*
67+
* @param source
68+
* The source Observable to emit the first item for.
69+
* @param defaultValue
70+
* The default value to use whenever the source Observable doesn't emit anything.
71+
* @return A subscription function for creating the target Observable.
72+
*/
73+
public static <T> OnSubscribeFunc<T> firstOrDefault(Observable<? extends T> source, T defaultValue) {
74+
return new FirstOrElse<T>(source, alwaysTrue(), defaultValue);
75+
}
76+
77+
private static class FirstOrElse<T> implements OnSubscribeFunc<T> {
78+
private final Observable<? extends T> source;
79+
private final Func1<? super T, Boolean> predicate;
80+
private final T defaultValue;
81+
82+
private FirstOrElse(Observable<? extends T> source, Func1<? super T, Boolean> predicate, T defaultValue) {
83+
this.source = source;
84+
this.defaultValue = defaultValue;
85+
this.predicate = predicate;
86+
}
87+
88+
@Override
89+
public Subscription onSubscribe(final Observer<? super T> observer) {
90+
final Subscription sourceSub = source.subscribe(new Observer<T>() {
91+
private final AtomicBoolean hasEmitted = new AtomicBoolean(false);
92+
93+
@Override
94+
public void onCompleted() {
95+
if (!hasEmitted.get()) {
96+
observer.onNext(defaultValue);
97+
observer.onCompleted();
98+
}
99+
}
100+
101+
@Override
102+
public void onError(Throwable e) {
103+
observer.onError(e);
104+
}
105+
106+
@Override
107+
public void onNext(T next) {
108+
try {
109+
if (!hasEmitted.get() && predicate.call(next)) {
110+
hasEmitted.set(true);
111+
observer.onNext(next);
112+
observer.onCompleted();
113+
}
114+
} catch (Throwable t) {
115+
// may happen within the predicate call (user code)
116+
observer.onError(t);
117+
}
118+
}
119+
});
120+
121+
return Subscriptions.create(new Action0() {
122+
@Override
123+
public void call() {
124+
sourceSub.unsubscribe();
125+
}
126+
});
127+
}
128+
}
129+
130+
public static class UnitTest {
131+
@Mock
132+
Observer<? super String> w;
133+
134+
private static final Func1<String, Boolean> IS_D = new Func1<String, Boolean>() {
135+
@Override
136+
public Boolean call(String value) {
137+
return "d".equals(value);
138+
}
139+
};
140+
141+
@Before
142+
public void before() {
143+
initMocks(this);
144+
}
145+
146+
@Test
147+
public void testFirstOrElseOfNone() {
148+
Observable<String> src = empty();
149+
create(firstOrDefault(src, "default")).subscribe(w);
150+
151+
verify(w, times(1)).onNext(anyString());
152+
verify(w, times(1)).onNext("default");
153+
verify(w, never()).onError(any(Throwable.class));
154+
verify(w, times(1)).onCompleted();
155+
}
156+
157+
@Test
158+
public void testFirstOrElseOfSome() {
159+
Observable<String> src = from("a", "b", "c");
160+
create(firstOrDefault(src, "default")).subscribe(w);
161+
162+
verify(w, times(1)).onNext(anyString());
163+
verify(w, times(1)).onNext("a");
164+
verify(w, never()).onError(any(Throwable.class));
165+
verify(w, times(1)).onCompleted();
166+
}
167+
168+
@Test
169+
public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() {
170+
Observable<String> src = from("a", "b", "c");
171+
create(firstOrDefault(src, IS_D, "default")).subscribe(w);
172+
173+
verify(w, times(1)).onNext(anyString());
174+
verify(w, times(1)).onNext("default");
175+
verify(w, never()).onError(any(Throwable.class));
176+
verify(w, times(1)).onCompleted();
177+
}
178+
179+
@Test
180+
public void testFirstOrElseWithPredicateOfSome() {
181+
Observable<String> src = from("a", "b", "c", "d", "e", "f");
182+
create(firstOrDefault(src, IS_D, "default")).subscribe(w);
183+
184+
verify(w, times(1)).onNext(anyString());
185+
verify(w, times(1)).onNext("d");
186+
verify(w, never()).onError(any(Throwable.class));
187+
verify(w, times(1)).onCompleted();
188+
}
189+
}
190+
}

0 commit comments

Comments
 (0)