Skip to content

2.x: convert the Observable operators to return Single/Maybe #4579

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.reactivex.internal.fuseable.ScalarCallable;
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
import io.reactivex.internal.operators.single.SingleReduceFlowable;
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
import io.reactivex.internal.subscribers.*;
import io.reactivex.internal.util.*;
Expand Down Expand Up @@ -9847,7 +9846,7 @@ public final Flowable<T> rebatchRequests(int n) {
}

/**
* Returns a Single that applies a specified accumulator function to the first item emitted by a source
* Returns a Maybe that applies a specified accumulator function to the first item emitted by a source
* Publisher, then feeds the result of that function along with the second item emitted by the source
* Publisher into the same function, and so on until all items have been emitted by the source Publisher,
* and emits the final result from the final call to your function as its sole item.
Expand All @@ -9870,16 +9869,16 @@ public final Flowable<T> rebatchRequests(int n) {
* @param reducer
* an accumulator function to be invoked on each item emitted by the source Publisher, whose
* result will be used in the next accumulator call
* @return a Single that emits a single item that is the result of accumulating the items emitted by
* @return a Maybe that emits a single item that is the result of accumulating the items emitted by
* the source Flowable
* @see <a href="http://reactivex.io/documentation/operators/reduce.html">ReactiveX operators documentation: Reduce</a>
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Uncyclopedia: Fold (higher-order function)</a>
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> reduce(BiFunction<T, T, T> reducer) {
public final Maybe<T> reduce(BiFunction<T, T, T> reducer) {
ObjectHelper.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new SingleReduceFlowable<T>(this, reducer));
return RxJavaPlugins.onAssembly(new FlowableReduceMaybe<T>(this, reducer));
}

/**
Expand Down
190 changes: 53 additions & 137 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static final class LastSubscriber<T> implements Subscriber<T>, Disposable {

T item;

public LastSubscriber(MaybeObserver<? super T> actual) {
LastSubscriber(MaybeObserver<? super T> actual) {
this.actual = actual;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static final class LastSubscriber<T> implements Subscriber<T>, Disposable {

T item;

public LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
this.actual = actual;
this.defaultItem = defaultItem;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.single;

import java.util.NoSuchElementException;
package io.reactivex.internal.operators.flowable;

import org.reactivestreams.*;

Expand All @@ -23,7 +21,6 @@
import io.reactivex.functions.BiFunction;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.operators.flowable.FlowableReduce;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

Expand All @@ -32,15 +29,15 @@
*
* @param <T> the value type
*/
public final class SingleReduceFlowable<T>
extends Single<T>
public final class FlowableReduceMaybe<T>
extends Maybe<T>
implements HasUpstreamPublisher<T>, FuseToFlowable<T> {

final Flowable<T> source;

final BiFunction<T, T, T> reducer;

public SingleReduceFlowable(Flowable<T> source, BiFunction<T, T, T> reducer) {
public FlowableReduceMaybe(Flowable<T> source, BiFunction<T, T, T> reducer) {
this.source = source;
this.reducer = reducer;
}
Expand All @@ -52,17 +49,16 @@ public Publisher<T> source() {

@Override
public Flowable<T> fuseToFlowable() {
// return RxJavaPlugins.onAssembly(new SingleToFlowable<T>(this));
return RxJavaPlugins.onAssembly(new FlowableReduce<T>(source, reducer));
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new ReduceSubscriber<T>(observer, reducer));
}

static final class ReduceSubscriber<T> implements Subscriber<T>, Disposable {
final SingleObserver<? super T> actual;
final MaybeObserver<? super T> actual;

final BiFunction<T, T, T> reducer;

Expand All @@ -72,7 +68,7 @@ static final class ReduceSubscriber<T> implements Subscriber<T>, Disposable {

boolean done;

ReduceSubscriber(SingleObserver<? super T> actual, BiFunction<T, T, T> reducer) {
ReduceSubscriber(MaybeObserver<? super T> actual, BiFunction<T, T, T> reducer) {
this.actual = actual;
this.reducer = reducer;
}
Expand Down Expand Up @@ -139,7 +135,7 @@ public void onComplete() {
// value = null;
actual.onSuccess(v);
} else {
actual.onError(new NoSuchElementException());
actual.onComplete();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.FuseToObservable;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableCountSingle<T> extends Single<Long> implements FuseToObservable<Long> {
final ObservableSource<T> source;
public ObservableCountSingle(ObservableSource<T> source) {
this.source = source;
}

@Override
public void subscribeActual(SingleObserver<? super Long> t) {
source.subscribe(new CountObserver(t));
}

@Override
public Observable<Long> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableCount<T>(source));
}

static final class CountObserver implements Observer<Object>, Disposable {
final SingleObserver<? super Long> actual;

Disposable d;

long count;

CountObserver(SingleObserver<? super Long> actual) {
this.actual = actual;
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.d, d)) {
this.d = d;
actual.onSubscribe(this);
}
}


@Override
public void dispose() {
d.dispose();
d = DisposableHelper.DISPOSED;
}

@Override
public boolean isDisposed() {
return d.isDisposed();
}

@Override
public void onNext(Object t) {
count++;
}

@Override
public void onError(Throwable t) {
d = DisposableHelper.DISPOSED;
actual.onError(t);
}

@Override
public void onComplete() {
d = DisposableHelper.DISPOSED;
actual.onSuccess(count);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,10 @@ public void onComplete() {
if (index <= count && !done) {
done = true;
T v = defaultValue;
if (v == null) {
actual.onError(new IndexOutOfBoundsException());
} else {
if (v != null) {
actual.onNext(v);
actual.onComplete();
}
actual.onComplete();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.FuseToObservable;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableElementAtMaybe<T> extends Maybe<T> implements FuseToObservable<T> {
final ObservableSource<T> source;
final long index;
public ObservableElementAtMaybe(ObservableSource<T> source, long index) {
this.source = source;
this.index = index;
}
@Override
public void subscribeActual(MaybeObserver<? super T> t) {
source.subscribe(new ElementAtObserver<T>(t, index));
}

@Override
public Observable<T> fuseToObservable() {
return RxJavaPlugins.onAssembly(new ObservableElementAt<T>(source, index, null));
}

static final class ElementAtObserver<T> implements Observer<T>, Disposable {
final MaybeObserver<? super T> actual;
final long index;

Disposable s;

long count;

boolean done;

ElementAtObserver(MaybeObserver<? super T> actual, long index) {
this.actual = actual;
this.index = index;
}

@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
actual.onSubscribe(this);
}
}


@Override
public void dispose() {
s.dispose();
}

@Override
public boolean isDisposed() {
return s.isDisposed();
}


@Override
public void onNext(T t) {
if (done) {
return;
}
long c = count;
if (c == index) {
done = true;
s.dispose();
actual.onSuccess(t);
return;
}
count = c + 1;
}

@Override
public void onError(Throwable t) {
if (done) {
return;
}
done = true;
actual.onError(t);
}

@Override
public void onComplete() {
if (index <= count && !done) {
done = true;
actual.onComplete();
}
}
}
}
Loading