Skip to content

Integrate Maybe and Single into Observable. *** DO NOT MERGE #4481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
170 changes: 52 additions & 118 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

37 changes: 37 additions & 0 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,23 @@ public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* a Publisher sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Publisher of SingleSource instances
* @return the new Flowable instance
* @since 2.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Observable<T> concat(Observable<? extends SingleSource<? extends T>> sources) {
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
}

/**
* Returns a Flowable that emits the items emitted by two Singles, one after the other.
Expand Down Expand Up @@ -1702,6 +1719,26 @@ public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<?
return RxJavaPlugins.onAssembly(new SingleFlatMap<T, R>(this, mapper));
}

/**
* Returns a Single that is based on applying a specified function to the item emitted by the source Single,
* where that function returns a SingleSource.
* <p>
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code flatMap} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the result value type
* @param mapper
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
*/
public final <R> Observable<R> flatMapObservable(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return toObservable().flatMap(mapper);
}

/**
* Returns a Flowable that emits items based on applying a specified function to the item emitted by the
* source Single, where that function returns a Publisher.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.*;
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;

/**
* Base class for operators with a source consumable.
*
* @param <T> the input source type
* @param <U> the output type
*/
public abstract class AbstractMaybeWithUpstreamObservable<T, U> extends Maybe<U> implements HasUpstreamObservableSource<T> {

/** The source consumable Observable. */
protected final ObservableSource<T> source;

/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
public AbstractMaybeWithUpstreamObservable(ObservableSource<T> source) {
this.source = source;
}

@Override
public final ObservableSource<T> source() {
return source;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.disposables.*;
import io.reactivex.internal.operators.single.AbstractSingleWithUpstreamObservable;
import io.reactivex.plugins.RxJavaPlugins;

public final class ObservableCollect<T, U> extends AbstractObservableWithUpstream<T, U> {
public final class ObservableCollect<T, U> extends AbstractSingleWithUpstreamObservable<T, U> {
final Callable<? extends U> initialSupplier;
final BiConsumer<? super U, ? super T> collector;

Expand All @@ -32,7 +33,7 @@ public ObservableCollect(ObservableSource<T> source,
}

@Override
protected void subscribeActual(Observer<? super U> t) {
protected void subscribeActual(SingleObserver<? super U> t) {
U u;
try {
u = initialSupplier.call();
Expand All @@ -51,15 +52,15 @@ protected void subscribeActual(Observer<? super U> t) {
}

static final class CollectSubscriber<T, U> implements Observer<T>, Disposable {
final Observer<? super U> actual;
final SingleObserver<? super U> actual;
final BiConsumer<? super U, ? super T> collector;
final U u;

Disposable s;

boolean done;

public CollectSubscriber(Observer<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
public CollectSubscriber(SingleObserver<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
this.actual = actual;
this.collector = collector;
this.u = u;
Expand Down Expand Up @@ -114,8 +115,7 @@ public void onComplete() {
return;
}
done = true;
actual.onNext(u);
actual.onComplete();
actual.onSuccess(u);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,26 @@
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.operators.single.AbstractSingleWithUpstreamObservable;

public final class ObservableCount<T> extends AbstractObservableWithUpstream<T, Long> {
public final class ObservableCount<T> extends AbstractSingleWithUpstreamObservable<T, Long> {
public ObservableCount(ObservableSource<T> source) {
super(source);
}

@Override
public void subscribeActual(Observer<? super Long> t) {
public void subscribeActual(SingleObserver<? super Long> t) {
source.subscribe(new CountSubscriber(t));
}

static final class CountSubscriber implements Observer<Object>, Disposable {
final Observer<? super Long> actual;
final SingleObserver<? super Long> actual;

Disposable s;

long count;

public CountSubscriber(Observer<? super Long> actual) {
public CountSubscriber(SingleObserver<? super Long> actual) {
this.actual = actual;
}

Expand Down Expand Up @@ -69,8 +70,7 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
actual.onNext(count);
actual.onComplete();
actual.onSuccess(count);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,33 @@
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.operators.maybe.AbstractMaybeWithUpstreamObservable;

public final class ObservableElementAt<T> extends AbstractObservableWithUpstream<T, T> {
public final class ObservableElementAt<T> extends AbstractMaybeWithUpstreamObservable<T, T> {
final long index;
final T defaultValue;
public ObservableElementAt(ObservableSource<T> source, long index, T defaultValue) {
public ObservableElementAt(ObservableSource<T> source, long index) {
super(source);
this.index = index;
this.defaultValue = defaultValue;
}

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new ElementAtSubscriber<T>(t, index, defaultValue));
public void subscribeActual(MaybeObserver<? super T> t) {
source.subscribe(new ElementAtSubscriber<T>(t, index));
}

static final class ElementAtSubscriber<T> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final MaybeObserver<? super T> actual;
final long index;
final T defaultValue;

Disposable s;

long count;

boolean done;

public ElementAtSubscriber(Observer<? super T> actual, long index, T defaultValue) {
public ElementAtSubscriber(MaybeObserver<? super T> actual, long index) {
this.actual = actual;
this.index = index;
this.defaultValue = defaultValue;
}

@Override
Expand Down Expand Up @@ -76,8 +74,7 @@ public void onNext(T t) {
if (c == index) {
done = true;
s.dispose();
actual.onNext(t);
actual.onComplete();
actual.onSuccess(t);
return;
}
count = c + 1;
Expand All @@ -96,13 +93,7 @@ public void onError(Throwable t) {
public void onComplete() {
if (index <= count && !done) {
done = true;
T v = defaultValue;
if (v == null) {
actual.onError(new IndexOutOfBoundsException());
} else {
actual.onNext(v);
actual.onComplete();
}
actual.onComplete();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,40 @@

package io.reactivex.internal.operators.observable;

import java.util.NoSuchElementException;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.operators.single.AbstractSingleWithUpstreamObservable;

public final class ObservableSingle<T> extends AbstractObservableWithUpstream<T, T> {

public final class ObservableElementAtWithDefualt<T> extends AbstractSingleWithUpstreamObservable<T, T> {
final long index;
final T defaultValue;
public ObservableSingle(ObservableSource<T> source, T defaultValue) {

public ObservableElementAtWithDefualt(ObservableSource<T> source, long index, T defaultValue) {
super(source);
this.index = index;
this.defaultValue = defaultValue;
}

@Override
public void subscribeActual(Observer<? super T> t) {
source.subscribe(new SingleElementSubscriber<T>(t, defaultValue));
public void subscribeActual(SingleObserver<? super T> t) {
source.subscribe(new ElementAtSubscriber<T>(t, index, defaultValue));
}

static final class SingleElementSubscriber<T> implements Observer<T>, Disposable {
final Observer<? super T> actual;
static final class ElementAtSubscriber<T> implements Observer<T>, Disposable {
final SingleObserver<? super T> actual;
final long index;
final T defaultValue;

Disposable s;

T value;
long count;

boolean done;

public SingleElementSubscriber(Observer<? super T> actual, T defaultValue) {
public ElementAtSubscriber(SingleObserver<? super T> actual, long index, T defaultValue) {
this.actual = actual;
this.index = index;
this.defaultValue = defaultValue;
}

Expand Down Expand Up @@ -72,13 +75,14 @@ public void onNext(T t) {
if (done) {
return;
}
if (value != null) {
long c = count;
if (c == index) {
done = true;
s.dispose();
actual.onError(new IllegalArgumentException("Sequence contains more than one element!"));
actual.onSuccess(t);
return;
}
value = t;
count = c + 1;
}

@Override
Expand All @@ -92,20 +96,9 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
return;
}
done = true;
T v = value;
value = null;
if (v == null) {
v = defaultValue;
}
if (v == null) {
actual.onError(new NoSuchElementException());
} else {
actual.onNext(v);
actual.onComplete();
if (index <= count && !done) {
done = true;
actual.onSuccess(defaultValue);
}
}
}
Expand Down
Loading