Skip to content

Commit 31ed500

Browse files
authored
2.x: convert the Observable operators to return Single/Maybe (#4579)
1 parent 470eddb commit 31ed500

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+1800
-382
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.reactivex.internal.fuseable.ScalarCallable;
2727
import io.reactivex.internal.operators.flowable.*;
2828
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
29-
import io.reactivex.internal.operators.single.SingleReduceFlowable;
3029
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3130
import io.reactivex.internal.subscribers.*;
3231
import io.reactivex.internal.util.*;
@@ -9847,7 +9846,7 @@ public final Flowable<T> rebatchRequests(int n) {
98479846
}
98489847

98499848
/**
9850-
* Returns a Single that applies a specified accumulator function to the first item emitted by a source
9849+
* Returns a Maybe that applies a specified accumulator function to the first item emitted by a source
98519850
* Publisher, then feeds the result of that function along with the second item emitted by the source
98529851
* Publisher into the same function, and so on until all items have been emitted by the source Publisher,
98539852
* and emits the final result from the final call to your function as its sole item.
@@ -9870,16 +9869,16 @@ public final Flowable<T> rebatchRequests(int n) {
98709869
* @param reducer
98719870
* an accumulator function to be invoked on each item emitted by the source Publisher, whose
98729871
* result will be used in the next accumulator call
9873-
* @return a Single that emits a single item that is the result of accumulating the items emitted by
9872+
* @return a Maybe that emits a single item that is the result of accumulating the items emitted by
98749873
* the source Flowable
98759874
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
98769875
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Uncyclopedia: Fold (higher-order function)</a>
98779876
*/
98789877
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
98799878
@SchedulerSupport(SchedulerSupport.NONE)
9880-
public final Single<T> reduce(BiFunction<T, T, T> reducer) {
9879+
public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
98819880
ObjectHelper.requireNonNull(reducer, "reducer is null");
9882-
return RxJavaPlugins.onAssembly(new SingleReduceFlowable<T>(this, reducer));
9881+
return RxJavaPlugins.onAssembly(new FlowableReduceMaybe<T>(this, reducer));
98839882
}
98849883

98859884
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 53 additions & 137 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/operators/flowable/FlowableLastMaybe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ static final class LastSubscriber<T> implements Subscriber<T>, Disposable {
4747

4848
T item;
4949

50-
public LastSubscriber(MaybeObserver<? super T> actual) {
50+
LastSubscriber(MaybeObserver<? super T> actual) {
5151
this.actual = actual;
5252
}
5353

src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ static final class LastSubscriber<T> implements Subscriber<T>, Disposable {
5353

5454
T item;
5555

56-
public LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
56+
LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
5757
this.actual = actual;
5858
this.defaultItem = defaultItem;
5959
}

src/main/java/io/reactivex/internal/operators/single/SingleReduceFlowable.java renamed to src/main/java/io/reactivex/internal/operators/flowable/FlowableReduceMaybe.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.operators.single;
15-
16-
import java.util.NoSuchElementException;
14+
package io.reactivex.internal.operators.flowable;
1715

1816
import org.reactivestreams.*;
1917

@@ -23,7 +21,6 @@
2321
import io.reactivex.functions.BiFunction;
2422
import io.reactivex.internal.functions.ObjectHelper;
2523
import io.reactivex.internal.fuseable.*;
26-
import io.reactivex.internal.operators.flowable.FlowableReduce;
2724
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2825
import io.reactivex.plugins.RxJavaPlugins;
2926

@@ -32,15 +29,15 @@
3229
*
3330
* @param <T> the value type
3431
*/
35-
public final class SingleReduceFlowable<T>
36-
extends Single<T>
32+
public final class FlowableReduceMaybe<T>
33+
extends Maybe<T>
3734
implements HasUpstreamPublisher<T>, FuseToFlowable<T> {
3835

3936
final Flowable<T> source;
4037

4138
final BiFunction<T, T, T> reducer;
4239

43-
public SingleReduceFlowable(Flowable<T> source, BiFunction<T, T, T> reducer) {
40+
public FlowableReduceMaybe(Flowable<T> source, BiFunction<T, T, T> reducer) {
4441
this.source = source;
4542
this.reducer = reducer;
4643
}
@@ -52,17 +49,16 @@ public Publisher<T> source() {
5249

5350
@Override
5451
public Flowable<T> fuseToFlowable() {
55-
// return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
5652
return RxJavaPlugins.onAssembly(new FlowableReduce<T>(source, reducer));
5753
}
5854

5955
@Override
60-
protected void subscribeActual(SingleObserver<? super T> observer) {
56+
protected void subscribeActual(MaybeObserver<? super T> observer) {
6157
source.subscribe(new ReduceSubscriber<T>(observer, reducer));
6258
}
6359

6460
static final class ReduceSubscriber<T> implements Subscriber<T>, Disposable {
65-
final SingleObserver<? super T> actual;
61+
final MaybeObserver<? super T> actual;
6662

6763
final BiFunction<T, T, T> reducer;
6864

@@ -72,7 +68,7 @@ static final class ReduceSubscriber<T> implements Subscriber<T>, Disposable {
7268

7369
boolean done;
7470

75-
ReduceSubscriber(SingleObserver<? super T> actual, BiFunction<T, T, T> reducer) {
71+
ReduceSubscriber(MaybeObserver<? super T> actual, BiFunction<T, T, T> reducer) {
7672
this.actual = actual;
7773
this.reducer = reducer;
7874
}
@@ -139,7 +135,7 @@ public void onComplete() {
139135
// value = null;
140136
actual.onSuccess(v);
141137
} else {
142-
actual.onError(new NoSuchElementException());
138+
actual.onComplete();
143139
}
144140
}
145141

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.fuseable.FuseToObservable;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
public final class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
23+
final ObservableSource<T> source;
24+
public ObservableCountSingle(ObservableSource<T> source) {
25+
this.source = source;
26+
}
27+
28+
@Override
29+
public void subscribeActual(SingleObserver<? super Long> t) {
30+
source.subscribe(new CountObserver(t));
31+
}
32+
33+
@Override
34+
public Observable<Long> fuseToObservable() {
35+
return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
36+
}
37+
38+
static final class CountObserver implements Observer<Object>, Disposable {
39+
final SingleObserver<? super Long> actual;
40+
41+
Disposable d;
42+
43+
long count;
44+
45+
CountObserver(SingleObserver<? super Long> actual) {
46+
this.actual = actual;
47+
}
48+
49+
@Override
50+
public void onSubscribe(Disposable d) {
51+
if (DisposableHelper.validate(this.d, d)) {
52+
this.d = d;
53+
actual.onSubscribe(this);
54+
}
55+
}
56+
57+
58+
@Override
59+
public void dispose() {
60+
d.dispose();
61+
d = DisposableHelper.DISPOSED;
62+
}
63+
64+
@Override
65+
public boolean isDisposed() {
66+
return d.isDisposed();
67+
}
68+
69+
@Override
70+
public void onNext(Object t) {
71+
count++;
72+
}
73+
74+
@Override
75+
public void onError(Throwable t) {
76+
d = DisposableHelper.DISPOSED;
77+
actual.onError(t);
78+
}
79+
80+
@Override
81+
public void onComplete() {
82+
d = DisposableHelper.DISPOSED;
83+
actual.onSuccess(count);
84+
}
85+
}
86+
}

src/main/java/io/reactivex/internal/operators/observable/ObservableElementAt.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -97,12 +97,10 @@ public void onComplete() {
9797
if (index <= count && !done) {
9898
done = true;
9999
T v = defaultValue;
100-
if (v == null) {
101-
actual.onError(new IndexOutOfBoundsException());
102-
} else {
100+
if (v != null) {
103101
actual.onNext(v);
104-
actual.onComplete();
105102
}
103+
actual.onComplete();
106104
}
107105
}
108106
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import io.reactivex.*;
17+
import io.reactivex.disposables.Disposable;
18+
import io.reactivex.internal.disposables.DisposableHelper;
19+
import io.reactivex.internal.fuseable.FuseToObservable;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
public final class ObservableElementAtMaybe<T> extends Maybe<T> implements FuseToObservable<T> {
23+
final ObservableSource<T> source;
24+
final long index;
25+
public ObservableElementAtMaybe(ObservableSource<T> source, long index) {
26+
this.source = source;
27+
this.index = index;
28+
}
29+
@Override
30+
public void subscribeActual(MaybeObserver<? super T> t) {
31+
source.subscribe(new ElementAtObserver<T>(t, index));
32+
}
33+
34+
@Override
35+
public Observable<T> fuseToObservable() {
36+
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, null));
37+
}
38+
39+
static final class ElementAtObserver<T> implements Observer<T>, Disposable {
40+
final MaybeObserver<? super T> actual;
41+
final long index;
42+
43+
Disposable s;
44+
45+
long count;
46+
47+
boolean done;
48+
49+
ElementAtObserver(MaybeObserver<? super T> actual, long index) {
50+
this.actual = actual;
51+
this.index = index;
52+
}
53+
54+
@Override
55+
public void onSubscribe(Disposable s) {
56+
if (DisposableHelper.validate(this.s, s)) {
57+
this.s = s;
58+
actual.onSubscribe(this);
59+
}
60+
}
61+
62+
63+
@Override
64+
public void dispose() {
65+
s.dispose();
66+
}
67+
68+
@Override
69+
public boolean isDisposed() {
70+
return s.isDisposed();
71+
}
72+
73+
74+
@Override
75+
public void onNext(T t) {
76+
if (done) {
77+
return;
78+
}
79+
long c = count;
80+
if (c == index) {
81+
done = true;
82+
s.dispose();
83+
actual.onSuccess(t);
84+
return;
85+
}
86+
count = c + 1;
87+
}
88+
89+
@Override
90+
public void onError(Throwable t) {
91+
if (done) {
92+
return;
93+
}
94+
done = true;
95+
actual.onError(t);
96+
}
97+
98+
@Override
99+
public void onComplete() {
100+
if (index <= count && !done) {
101+
done = true;
102+
actual.onComplete();
103+
}
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)