Skip to content

2.x: add more Maybe operators, fix a few javadoc mistakes #4467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 37 additions & 67 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

589 changes: 572 additions & 17 deletions src/main/java/io/reactivex/Maybe.java

Large diffs are not rendered by default.

116 changes: 43 additions & 73 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

41 changes: 38 additions & 3 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,29 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
* @return the new Flowable instance
* @since 2.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources) {
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), 2, ErrorMode.IMMEDIATE));
return concat(sources, 2);
}


/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by
* a Publisher sequence and prefetched by the specified amount.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the Publisher of SingleSource instances
* @param prefetch the number of SingleSources to prefetch from the Publisher
* @return the new Flowable instance
* @since 2.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources, int prefetch) {
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
}

/**
* Returns a Flowable that emits the items emitted by two Singles, one after the other.
* <p>
Expand Down Expand Up @@ -218,6 +236,23 @@ public static <T> Flowable<T> concat(
return concat(Flowable.fromArray(source1, source2, source3, source4));
}

/**
* Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in
* an array.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param sources the array of SingleSource instances
* @return the new Flowable instance
* @since 2.0
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concatArray(SingleSource<? extends T>... sources) {
return RxJavaPlugins.onAssembly(new FlowableConcatMap(Flowable.fromArray(sources), SingleInternalHelper.toFlowable(), 2, ErrorMode.BOUNDARY));
}

/**
* Provides an API (via a cold Completable) that bridges the reactive world with the callback-style world.
* <p>
Expand Down
61 changes: 0 additions & 61 deletions src/main/java/io/reactivex/disposables/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,6 @@ private Disposables() {
throw new IllegalStateException("No instances!");
}

/**
* Construct a Disposable by wrapping a Runnable that is
* executed exactly once when the Disposable is disposed.
* @param run the Runnable to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Runnable run) {
return fromRunnable(run);
}

/**
* Construct a Disposable by wrapping a Runnable that is
* executed exactly once when the Disposable is disposed.
Expand All @@ -55,18 +43,6 @@ public static Disposable fromRunnable(Runnable run) {
return new RunnableDisposable(run);
}

/**
* Construct a Disposable by wrapping a Action that is
* executed exactly once when the Disposable is disposed.
* @param run the Action to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Action run) {
return fromAction(run);
}

/**
* Construct a Disposable by wrapping a Action that is
* executed exactly once when the Disposable is disposed.
Expand All @@ -78,31 +54,6 @@ public static Disposable fromAction(Action run) {
return new ActionDisposable(run);
}

/**
* Construct a Disposable by wrapping a Future that is
* cancelled exactly once when the Disposable is disposed.
* @param future the Future to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Future<?> future) {
return fromFuture(future, true);
}

/**
* Construct a Disposable by wrapping a Runnable that is
* executed exactly once when the Disposable is disposed.
* @param future the Runnable to wrap
* @param allowInterrupt if true, the future cancel happens via Future.cancel(true)
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Future<?> future, boolean allowInterrupt) {
return fromFuture(future, allowInterrupt);
}

/**
* Construct a Disposable by wrapping a Future that is
* cancelled exactly once when the Disposable is disposed.
Expand All @@ -126,18 +77,6 @@ public static Disposable fromFuture(Future<?> future, boolean allowInterrupt) {
return new FutureDisposable(future, allowInterrupt);
}

/**
* Construct a Disposable by wrapping a Subscription that is
* cancelled exactly once when the Disposable is disposed.
* @param subscription the Runnable to wrap
* @return the new Disposable instance
* @deprecated use {@link #fromRunnable(Runnable)} to avoid lambda-ambiguity
*/
@Deprecated
public static Disposable from(Subscription subscription) {
return fromSubscription(subscription);
}

/**
* Construct a Disposable by wrapping a Subscription that is
* cancelled exactly once when the Disposable is disposed.
Expand Down
31 changes: 31 additions & 0 deletions src/main/java/io/reactivex/internal/functions/ObjectHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,35 @@ public boolean test(Object o1, Object o2) {
public static <T> BiPredicate<T, T> equalsPredicate() {
return (BiPredicate<T, T>)EQUALS;
}

/**
* Validate that the given value is positive or report an IllegalArgumentException with
* the parameter name.
* @param value the value to validate
* @param paramName the parameter name of the value
* @return value
* @throws IllegalArgumentException if bufferSize &lt;= 0
*/
public static int verifyPositive(int value, String paramName) {
if (value <= 0) {
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
}
return value;
}

/**
* Validate that the given value is positive or report an IllegalArgumentException with
* the parameter name.
* @param value the value to validate
* @param paramName the parameter name of the value
* @return value
* @throws IllegalArgumentException if bufferSize &lt;= 0
*/
public static long verifyPositive(long value, String paramName) {
if (value <= 0L) {
throw new IllegalArgumentException(paramName + " > 0 required but it was " + value);
}
return value;
}

}
118 changes: 118 additions & 0 deletions src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import java.util.concurrent.atomic.AtomicBoolean;

import io.reactivex.*;
import io.reactivex.disposables.*;
import io.reactivex.plugins.RxJavaPlugins;

/**
* Signals the event of the first MaybeSource that signals.
*
* @param <T> the value type emitted
*/
public final class MaybeAmbArray<T> extends Maybe<T> {

final MaybeSource<? extends T>[] sources;

public MaybeAmbArray(MaybeSource<? extends T>[] sources) {
this.sources = sources;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {

AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
observer.onSubscribe(parent);

for (MaybeSource<? extends T> s : sources) {
if (parent.isDisposed()) {
return;
}

if (s == null) {
parent.onError(new NullPointerException("One of the MaybeSources is null"));
return;
}

s.subscribe(parent);
}
}

static final class AmbMaybeObserver<T>
extends AtomicBoolean
implements MaybeObserver<T>, Disposable {

/** */
private static final long serialVersionUID = -7044685185359438206L;

final MaybeObserver<? super T> actual;

final CompositeDisposable set;

public AmbMaybeObserver(MaybeObserver<? super T> actual) {
this.actual = actual;
this.set = new CompositeDisposable();
}

@Override
public void dispose() {
if (compareAndSet(false, true)) {
set.dispose();
}
}

@Override
public boolean isDisposed() {
return get();
}

@Override
public void onSubscribe(Disposable d) {
set.add(d);
}

@Override
public void onSuccess(T value) {
if (compareAndSet(false, true)) {
set.dispose();

actual.onSuccess(value);
}
}

@Override
public void onError(Throwable e) {
if (compareAndSet(false, true)) {
set.dispose();

actual.onError(e);
} else {
RxJavaPlugins.onError(e);
}
}

@Override
public void onComplete() {
if (compareAndSet(false, true)) {
set.dispose();

actual.onComplete();
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.*;
import io.reactivex.internal.operators.maybe.MaybeAmbArray.AmbMaybeObserver;

/**
* Signals the event of the first MaybeSource that signals.
*
* @param <T> the value type emitted
*/
public final class MaybeAmbIterable<T> extends Maybe<T> {

final Iterable<? extends MaybeSource<? extends T>> sources;

public MaybeAmbIterable(Iterable<? extends MaybeSource<? extends T>> sources) {
this.sources = sources;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
observer.onSubscribe(parent);

int i = 0;
for (MaybeSource<? extends T> s : sources) {
if (parent.isDisposed()) {
return;
}

if (s == null) {
parent.onError(new NullPointerException("One of the MaybeSources is null"));
return;
}

s.subscribe(parent);
i++;
}

if (i == 0) {
observer.onComplete();
}
}
}
Loading