Skip to content

2.x: last Maybe operators #4537

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3532,6 +3532,212 @@ public final <U> Maybe<T> takeUntil(Publisher<U> other) {
return RxJavaPlugins.onAssembly(new MaybeTakeUntilPublisher<T, U>(this, other));
}

/**
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
* item. If the next item isn't emitted within the specified timeout duration starting from its predecessor,
* the resulting Maybe terminates and notifies MaybeObservers of a {@code TimeoutException}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param timeout
* maximum duration between emitted items before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument.
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit) {
return timeout(timeout, timeUnit, Schedulers.computation());
}

/**
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
* item. If the next item isn't emitted within the specified timeout duration starting from its predecessor,
* the resulting Maybe begins instead to mirror a fallback MaybeSource.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param timeout
* maximum duration between items before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param fallback
* the fallback MaybeSource to use in case of a timeout
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit, MaybeSource<? extends T> fallback) {
ObjectHelper.requireNonNull(fallback, "other is null");
return timeout(timeout, timeUnit, Schedulers.computation(), fallback);
}

/**
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
* item using a specified Scheduler. If the next item isn't emitted within the specified timeout duration
* starting from its predecessor, the resulting Maybe begins instead to mirror a fallback MaybeSource.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param timeout
* maximum duration between items before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param fallback
* the MaybeSource to use as the fallback in case of a timeout
* @param scheduler
* the {@link Scheduler} to run the timeout timers on
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, MaybeSource<? extends T> fallback) {
ObjectHelper.requireNonNull(fallback, "fallback is null");
return timeout(timer(timeout, timeUnit, scheduler), fallback);
}

/**
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
* item, where this policy is governed on a specified Scheduler. If the next item isn't emitted within the
* specified timeout duration starting from its predecessor, the resulting Maybe terminates and
* notifies MaybeObservers of a {@code TimeoutException}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1s.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param timeout
* maximum duration between items before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param scheduler
* the Scheduler to run the timeout timers on
* @return the new Maybe instance
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
return timeout(timer(timeout, timeUnit, scheduler));
}

/**
* If this Maybe source didn't signal an event before the timeoutIndicator MaybeSource signals, a
* TimeoutException is signalled instead.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the value type of the
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
* or onComplete.
* @return the new Maybe instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Maybe<T> timeout(MaybeSource<U> timeoutIndicator) {
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
return RxJavaPlugins.onAssembly(new MaybeTimeoutMaybe<T, U>(this, timeoutIndicator, null));
}

/**
* If the current Maybe source didn't signal an event before the timeoutIndicator MaybeSource signals,
* the current Maybe is cancelled and the {@code fallback} MaybeSource subscribed to
* as a continuation.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the value type of the
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
* or onComplete.
* @param fallback the MaybeSource that is subscribed to if the current Maybe times out
* @return the new Maybe instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Maybe<T> timeout(MaybeSource<U> timeoutIndicator, MaybeSource<? extends T> fallback) {
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
ObjectHelper.requireNonNull(fallback, "fallback is null");
return RxJavaPlugins.onAssembly(new MaybeTimeoutMaybe<T, U>(this, timeoutIndicator, fallback));
}

/**
* If this Maybe source didn't signal an event before the timeoutIndicator Publisher signals, a
* TimeoutException is signalled instead.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code timeoutIndicator} {@link Publisher} is consumed in an unbounded manner and
* is cancelled after its first item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the value type of the
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
* or onComplete.
* @return the new Maybe instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Maybe<T> timeout(Publisher<U> timeoutIndicator) {
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
return RxJavaPlugins.onAssembly(new MaybeTimeoutPublisher<T, U>(this, timeoutIndicator, null));
}

/**
* If the current Maybe source didn't signal an event before the timeoutIndicator Publisher signals,
* the current Maybe is cancelled and the {@code fallback} MaybeSource subscribed to
* as a continuation.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code timeoutIndicator} {@link Publisher} is consumed in an unbounded manner and
* is cancelled after its first item.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <U> the value type of the
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
* or onComplete
* @param fallback the MaybeSource that is subscribed to if the current Maybe times out
* @return the new Maybe instance
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U> Maybe<T> timeout(Publisher<U> timeoutIndicator, MaybeSource<? extends T> fallback) {
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
ObjectHelper.requireNonNull(fallback, "fallback is null");
return RxJavaPlugins.onAssembly(new MaybeTimeoutPublisher<T, U>(this, timeoutIndicator, fallback));
}

/**
* Returns a Maybe which makes sure when a MaybeObserver disposes the Disposable,
* that call is propagated up on the specified scheduler
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.</dd>
* </dl>
* @param scheduler the target scheduler where to execute the cancellation
* @return the new Maybe instance
* @throws NullPointerException if scheduler is null
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Maybe<T> unsubscribeOn(final Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new MaybeUnsubscribeOn<T>(this, scheduler));
}

/**
* Waits until this and the other MaybeSource signal a success value then applies the given BiFunction
* to those values and emits the BiFunction's resulting value to downstream.
Expand Down
Loading