Skip to content

Commit 2cf2bab

Browse files
committed
Change Observable methods to return Single, Maybe, or Completable in cases where is it returning a 1 or 0 values.
1 parent ff3c5d0 commit 2cf2bab

File tree

58 files changed

+659
-1305
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+659
-1305
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 52 additions & 118 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,23 @@ public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends
165165
ObjectHelper.verifyPositive(prefetch, "prefetch");
166166
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
167167
}
168+
169+
/**
170+
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
171+
* a Publisher sequence.
172+
* <dl>
173+
* <dt><b>Scheduler:</b></dt>
174+
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
175+
* </dl>
176+
* @param <T> the value type
177+
* @param sources the Publisher of SingleSource instances
178+
* @return the new Flowable instance
179+
* @since 2.0
180+
*/
181+
@SuppressWarnings({ "unchecked", "rawtypes" })
182+
public static <T> Observable<T> concat(Observable<? extends SingleSource<? extends T>> sources) {
183+
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
184+
}
168185

169186
/**
170187
* Returns a Flowable that emits the items emitted by two Singles, one after the other.
@@ -1702,6 +1719,26 @@ public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<?
17021719
return RxJavaPlugins.onAssembly(new SingleFlatMap<T, R>(this, mapper));
17031720
}
17041721

1722+
/**
1723+
* Returns a Single that is based on applying a specified function to the item emitted by the source Single,
1724+
* where that function returns a SingleSource.
1725+
* <p>
1726+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.png" alt="">
1727+
* <dl>
1728+
* <dt><b>Scheduler:</b></dt>
1729+
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
1730+
* </dl>
1731+
*
1732+
* @param <R> the result value type
1733+
* @param mapper
1734+
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
1735+
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
1736+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1737+
*/
1738+
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
1739+
return toObservable().flatMap(mapper);
1740+
}
1741+
17051742
/**
17061743
* Returns a Flowable that emits items based on applying a specified function to the item emitted by the
17071744
* source Single, where that function returns a Publisher.
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.maybe;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
18+
19+
/**
20+
* Base class for operators with a source consumable.
21+
*
22+
* @param <T> the input source type
23+
* @param <U> the output type
24+
*/
25+
public abstract class AbstractMaybeWithUpstreamObservable<T, U> extends Maybe<U> implements HasUpstreamObservableSource<T> {
26+
27+
/** The source consumable Observable. */
28+
protected final ObservableSource<T> source;
29+
30+
/**
31+
* Constructs the ObservableSource with the given consumable.
32+
* @param source the consumable Observable
33+
*/
34+
public AbstractMaybeWithUpstreamObservable(ObservableSource<T> source) {
35+
this.source = source;
36+
}
37+
38+
@Override
39+
public final ObservableSource<T> source() {
40+
return source;
41+
}
42+
43+
}

src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
import io.reactivex.disposables.Disposable;
1919
import io.reactivex.functions.BiConsumer;
2020
import io.reactivex.internal.disposables.*;
21+
import io.reactivex.internal.operators.single.AbstractSingleWithUpstreamObservable;
2122
import io.reactivex.plugins.RxJavaPlugins;
2223

23-
public final class ObservableCollect<T, U> extends AbstractObservableWithUpstream<T, U> {
24+
public final class ObservableCollect<T, U> extends AbstractSingleWithUpstreamObservable<T, U> {
2425
final Callable<? extends U> initialSupplier;
2526
final BiConsumer<? super U, ? super T> collector;
2627

@@ -32,7 +33,7 @@ public ObservableCollect(ObservableSource<T> source,
3233
}
3334

3435
@Override
35-
protected void subscribeActual(Observer<? super U> t) {
36+
protected void subscribeActual(SingleObserver<? super U> t) {
3637
U u;
3738
try {
3839
u = initialSupplier.call();
@@ -51,15 +52,15 @@ protected void subscribeActual(Observer<? super U> t) {
5152
}
5253

5354
static final class CollectSubscriber<T, U> implements Observer<T>, Disposable {
54-
final Observer<? super U> actual;
55+
final SingleObserver<? super U> actual;
5556
final BiConsumer<? super U, ? super T> collector;
5657
final U u;
5758

5859
Disposable s;
5960

6061
boolean done;
6162

62-
public CollectSubscriber(Observer<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
63+
public CollectSubscriber(SingleObserver<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
6364
this.actual = actual;
6465
this.collector = collector;
6566
this.u = u;
@@ -114,8 +115,7 @@ public void onComplete() {
114115
return;
115116
}
116117
done = true;
117-
actual.onNext(u);
118-
actual.onComplete();
118+
actual.onSuccess(u);
119119
}
120120
}
121121
}

src/main/java/io/reactivex/internal/operators/observable/ObservableCount.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,26 @@
1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.operators.single.AbstractSingleWithUpstreamObservable;
1920

20-
public final class ObservableCount<T> extends AbstractObservableWithUpstream<T, Long> {
21+
public final class ObservableCount<T> extends AbstractSingleWithUpstreamObservable<T, Long> {
2122
public ObservableCount(ObservableSource<T> source) {
2223
super(source);
2324
}
2425

2526
@Override
26-
public void subscribeActual(Observer<? super Long> t) {
27+
public void subscribeActual(SingleObserver<? super Long> t) {
2728
source.subscribe(new CountSubscriber(t));
2829
}
2930

3031
static final class CountSubscriber implements Observer<Object>, Disposable {
31-
final Observer<? super Long> actual;
32+
final SingleObserver<? super Long> actual;
3233

3334
Disposable s;
3435

3536
long count;
3637

37-
public CountSubscriber(Observer<? super Long> actual) {
38+
public CountSubscriber(SingleObserver<? super Long> actual) {
3839
this.actual = actual;
3940
}
4041

@@ -69,8 +70,7 @@ public void onError(Throwable t) {
6970

7071
@Override
7172
public void onComplete() {
72-
actual.onNext(count);
73-
actual.onComplete();
73+
actual.onSuccess(count);
7474
}
7575
}
7676
}

src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java

Lines changed: 10 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,35 +16,33 @@
1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.operators.maybe.AbstractMaybeWithUpstreamObservable;
1920

20-
public final class ObservableElementAt<T> extends AbstractObservableWithUpstream<T, T> {
21+
public final class ObservableElementAt<T> extends AbstractMaybeWithUpstreamObservable<T, T> {
2122
final long index;
22-
final T defaultValue;
23-
public ObservableElementAt(ObservableSource<T> source, long index, T defaultValue) {
23+
public ObservableElementAt(ObservableSource<T> source, long index) {
2424
super(source);
2525
this.index = index;
26-
this.defaultValue = defaultValue;
2726
}
27+
2828
@Override
29-
public void subscribeActual(Observer<? super T> t) {
30-
source.subscribe(new ElementAtSubscriber<T>(t, index, defaultValue));
29+
public void subscribeActual(MaybeObserver<? super T> t) {
30+
source.subscribe(new ElementAtSubscriber<T>(t, index));
3131
}
3232

3333
static final class ElementAtSubscriber<T> implements Observer<T>, Disposable {
34-
final Observer<? super T> actual;
34+
final MaybeObserver<? super T> actual;
3535
final long index;
36-
final T defaultValue;
3736

3837
Disposable s;
3938

4039
long count;
4140

4241
boolean done;
4342

44-
public ElementAtSubscriber(Observer<? super T> actual, long index, T defaultValue) {
43+
public ElementAtSubscriber(MaybeObserver<? super T> actual, long index) {
4544
this.actual = actual;
4645
this.index = index;
47-
this.defaultValue = defaultValue;
4846
}
4947

5048
@Override
@@ -76,8 +74,7 @@ public void onNext(T t) {
7674
if (c == index) {
7775
done = true;
7876
s.dispose();
79-
actual.onNext(t);
80-
actual.onComplete();
77+
actual.onSuccess(t);
8178
return;
8279
}
8380
count = c + 1;
@@ -96,13 +93,7 @@ public void onError(Throwable t) {
9693
public void onComplete() {
9794
if (index <= count && !done) {
9895
done = true;
99-
T v = defaultValue;
100-
if (v == null) {
101-
actual.onError(new IndexOutOfBoundsException());
102-
} else {
103-
actual.onNext(v);
104-
actual.onComplete();
105-
}
96+
actual.onComplete();
10697
}
10798
}
10899
}

src/main/java/io/reactivex/internal/operators/observable/ObservableSingle.java renamed to src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtWithDefualt.java

Lines changed: 22 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -13,37 +13,40 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import java.util.NoSuchElementException;
17-
1816
import io.reactivex.*;
1917
import io.reactivex.disposables.Disposable;
2018
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.operators.single.AbstractSingleWithUpstreamObservable;
2120

22-
public final class ObservableSingle<T> extends AbstractObservableWithUpstream<T, T> {
23-
21+
public final class ObservableElementAtWithDefualt<T> extends AbstractSingleWithUpstreamObservable<T, T> {
22+
final long index;
2423
final T defaultValue;
25-
26-
public ObservableSingle(ObservableSource<T> source, T defaultValue) {
24+
25+
public ObservableElementAtWithDefualt(ObservableSource<T> source, long index, T defaultValue) {
2726
super(source);
27+
this.index = index;
2828
this.defaultValue = defaultValue;
2929
}
30+
3031
@Override
31-
public void subscribeActual(Observer<? super T> t) {
32-
source.subscribe(new SingleElementSubscriber<T>(t, defaultValue));
32+
public void subscribeActual(SingleObserver<? super T> t) {
33+
source.subscribe(new ElementAtSubscriber<T>(t, index, defaultValue));
3334
}
3435

35-
static final class SingleElementSubscriber<T> implements Observer<T>, Disposable {
36-
final Observer<? super T> actual;
36+
static final class ElementAtSubscriber<T> implements Observer<T>, Disposable {
37+
final SingleObserver<? super T> actual;
38+
final long index;
3739
final T defaultValue;
3840

3941
Disposable s;
4042

41-
T value;
43+
long count;
4244

4345
boolean done;
4446

45-
public SingleElementSubscriber(Observer<? super T> actual, T defaultValue) {
47+
public ElementAtSubscriber(SingleObserver<? super T> actual, long index, T defaultValue) {
4648
this.actual = actual;
49+
this.index = index;
4750
this.defaultValue = defaultValue;
4851
}
4952

@@ -72,13 +75,14 @@ public void onNext(T t) {
7275
if (done) {
7376
return;
7477
}
75-
if (value != null) {
78+
long c = count;
79+
if (c == index) {
7680
done = true;
7781
s.dispose();
78-
actual.onError(new IllegalArgumentException("Sequence contains more than one element!"));
82+
actual.onSuccess(t);
7983
return;
8084
}
81-
value = t;
85+
count = c + 1;
8286
}
8387

8488
@Override
@@ -92,20 +96,9 @@ public void onError(Throwable t) {
9296

9397
@Override
9498
public void onComplete() {
95-
if (done) {
96-
return;
97-
}
98-
done = true;
99-
T v = value;
100-
value = null;
101-
if (v == null) {
102-
v = defaultValue;
103-
}
104-
if (v == null) {
105-
actual.onError(new NoSuchElementException());
106-
} else {
107-
actual.onNext(v);
108-
actual.onComplete();
99+
if (index <= count && !done) {
100+
done = true;
101+
actual.onSuccess(defaultValue);
109102
}
110103
}
111104
}

0 commit comments

Comments
 (0)