Skip to content

Commit 988cf23

Browse files
authored
2.x: last Maybe operators (#4537)
1 parent 1ebd979 commit 988cf23

File tree

6 files changed

+923
-0
lines changed

6 files changed

+923
-0
lines changed

src/main/java/io/reactivex/Maybe.java

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3532,6 +3532,212 @@ public final <U> Maybe<T> takeUntil(Publisher<U> other) {
35323532
return RxJavaPlugins.onAssembly(new MaybeTakeUntilPublisher<T, U>(this, other));
35333533
}
35343534

3535+
/**
3536+
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
3537+
* item. If the next item isn't emitted within the specified timeout duration starting from its predecessor,
3538+
* the resulting Maybe terminates and notifies MaybeObservers of a {@code TimeoutException}.
3539+
* <p>
3540+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1.png" alt="">
3541+
* <dl>
3542+
* <dt><b>Scheduler:</b></dt>
3543+
* <dd>This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
3544+
* </dl>
3545+
*
3546+
* @param timeout
3547+
* maximum duration between emitted items before a timeout occurs
3548+
* @param timeUnit
3549+
* the unit of time that applies to the {@code timeout} argument.
3550+
* @return the new Maybe instance
3551+
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
3552+
*/
3553+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
3554+
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit) {
3555+
return timeout(timeout, timeUnit, Schedulers.computation());
3556+
}
3557+
3558+
/**
3559+
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
3560+
* item. If the next item isn't emitted within the specified timeout duration starting from its predecessor,
3561+
* the resulting Maybe begins instead to mirror a fallback MaybeSource.
3562+
* <p>
3563+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2.png" alt="">
3564+
* <dl>
3565+
* <dt><b>Scheduler:</b></dt>
3566+
* <dd>This version of {@code timeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
3567+
* </dl>
3568+
*
3569+
* @param timeout
3570+
* maximum duration between items before a timeout occurs
3571+
* @param timeUnit
3572+
* the unit of time that applies to the {@code timeout} argument
3573+
* @param fallback
3574+
* the fallback MaybeSource to use in case of a timeout
3575+
* @return the new Maybe instance
3576+
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
3577+
*/
3578+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
3579+
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit, MaybeSource<? extends T> fallback) {
3580+
ObjectHelper.requireNonNull(fallback, "other is null");
3581+
return timeout(timeout, timeUnit, Schedulers.computation(), fallback);
3582+
}
3583+
3584+
/**
3585+
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
3586+
* item using a specified Scheduler. If the next item isn't emitted within the specified timeout duration
3587+
* starting from its predecessor, the resulting Maybe begins instead to mirror a fallback MaybeSource.
3588+
* <p>
3589+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.2s.png" alt="">
3590+
* <dl>
3591+
* <dt><b>Scheduler:</b></dt>
3592+
* <dd>You specify which {@link Scheduler} this operator will use</dd>
3593+
* </dl>
3594+
*
3595+
* @param timeout
3596+
* maximum duration between items before a timeout occurs
3597+
* @param timeUnit
3598+
* the unit of time that applies to the {@code timeout} argument
3599+
* @param fallback
3600+
* the MaybeSource to use as the fallback in case of a timeout
3601+
* @param scheduler
3602+
* the {@link Scheduler} to run the timeout timers on
3603+
* @return the new Maybe instance
3604+
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
3605+
*/
3606+
@SchedulerSupport(SchedulerSupport.CUSTOM)
3607+
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, MaybeSource<? extends T> fallback) {
3608+
ObjectHelper.requireNonNull(fallback, "fallback is null");
3609+
return timeout(timer(timeout, timeUnit, scheduler), fallback);
3610+
}
3611+
3612+
/**
3613+
* Returns a Maybe that mirrors the source Maybe but applies a timeout policy for each emitted
3614+
* item, where this policy is governed on a specified Scheduler. If the next item isn't emitted within the
3615+
* specified timeout duration starting from its predecessor, the resulting Maybe terminates and
3616+
* notifies MaybeObservers of a {@code TimeoutException}.
3617+
* <p>
3618+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/timeout.1s.png" alt="">
3619+
* <dl>
3620+
* <dt><b>Scheduler:</b></dt>
3621+
* <dd>You specify which {@link Scheduler} this operator will use</dd>
3622+
* </dl>
3623+
*
3624+
* @param timeout
3625+
* maximum duration between items before a timeout occurs
3626+
* @param timeUnit
3627+
* the unit of time that applies to the {@code timeout} argument
3628+
* @param scheduler
3629+
* the Scheduler to run the timeout timers on
3630+
* @return the new Maybe instance
3631+
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
3632+
*/
3633+
@SchedulerSupport(SchedulerSupport.CUSTOM)
3634+
public final Maybe<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
3635+
return timeout(timer(timeout, timeUnit, scheduler));
3636+
}
3637+
3638+
/**
3639+
* If this Maybe source didn't signal an event before the timeoutIndicator MaybeSource signals, a
3640+
* TimeoutException is signalled instead.
3641+
* <dl>
3642+
* <dt><b>Scheduler:</b></dt>
3643+
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
3644+
* </dl>
3645+
* @param <U> the value type of the
3646+
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
3647+
* or onComplete.
3648+
* @return the new Maybe instance
3649+
*/
3650+
@SchedulerSupport(SchedulerSupport.NONE)
3651+
public final <U> Maybe<T> timeout(MaybeSource<U> timeoutIndicator) {
3652+
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
3653+
return RxJavaPlugins.onAssembly(new MaybeTimeoutMaybe<T, U>(this, timeoutIndicator, null));
3654+
}
3655+
3656+
/**
3657+
* If the current Maybe source didn't signal an event before the timeoutIndicator MaybeSource signals,
3658+
* the current Maybe is cancelled and the {@code fallback} MaybeSource subscribed to
3659+
* as a continuation.
3660+
* <dl>
3661+
* <dt><b>Scheduler:</b></dt>
3662+
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
3663+
* </dl>
3664+
* @param <U> the value type of the
3665+
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
3666+
* or onComplete.
3667+
* @param fallback the MaybeSource that is subscribed to if the current Maybe times out
3668+
* @return the new Maybe instance
3669+
*/
3670+
@SchedulerSupport(SchedulerSupport.NONE)
3671+
public final <U> Maybe<T> timeout(MaybeSource<U> timeoutIndicator, MaybeSource<? extends T> fallback) {
3672+
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
3673+
ObjectHelper.requireNonNull(fallback, "fallback is null");
3674+
return RxJavaPlugins.onAssembly(new MaybeTimeoutMaybe<T, U>(this, timeoutIndicator, fallback));
3675+
}
3676+
3677+
/**
3678+
* If this Maybe source didn't signal an event before the timeoutIndicator Publisher signals, a
3679+
* TimeoutException is signalled instead.
3680+
* <dl>
3681+
* <dt><b>Backpressure:</b></dt>
3682+
* <dd>The {@code timeoutIndicator} {@link Publisher} is consumed in an unbounded manner and
3683+
* is cancelled after its first item.</dd>
3684+
* <dt><b>Scheduler:</b></dt>
3685+
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
3686+
* </dl>
3687+
* @param <U> the value type of the
3688+
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
3689+
* or onComplete.
3690+
* @return the new Maybe instance
3691+
*/
3692+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
3693+
@SchedulerSupport(SchedulerSupport.NONE)
3694+
public final <U> Maybe<T> timeout(Publisher<U> timeoutIndicator) {
3695+
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
3696+
return RxJavaPlugins.onAssembly(new MaybeTimeoutPublisher<T, U>(this, timeoutIndicator, null));
3697+
}
3698+
3699+
/**
3700+
* If the current Maybe source didn't signal an event before the timeoutIndicator Publisher signals,
3701+
* the current Maybe is cancelled and the {@code fallback} MaybeSource subscribed to
3702+
* as a continuation.
3703+
* <dl>
3704+
* <dt><b>Backpressure:</b></dt>
3705+
* <dd>The {@code timeoutIndicator} {@link Publisher} is consumed in an unbounded manner and
3706+
* is cancelled after its first item.</dd>
3707+
* <dt><b>Scheduler:</b></dt>
3708+
* <dd>{@code timeout} does not operate by default on a particular {@link Scheduler}.</dd>
3709+
* </dl>
3710+
* @param <U> the value type of the
3711+
* @param timeoutIndicator the MaybeSource that indicates the timeout by signalling onSuccess
3712+
* or onComplete
3713+
* @param fallback the MaybeSource that is subscribed to if the current Maybe times out
3714+
* @return the new Maybe instance
3715+
*/
3716+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
3717+
@SchedulerSupport(SchedulerSupport.NONE)
3718+
public final <U> Maybe<T> timeout(Publisher<U> timeoutIndicator, MaybeSource<? extends T> fallback) {
3719+
ObjectHelper.requireNonNull(timeoutIndicator, "timoutIndicator is null");
3720+
ObjectHelper.requireNonNull(fallback, "fallback is null");
3721+
return RxJavaPlugins.onAssembly(new MaybeTimeoutPublisher<T, U>(this, timeoutIndicator, fallback));
3722+
}
3723+
3724+
/**
3725+
* Returns a Maybe which makes sure when a MaybeObserver disposes the Disposable,
3726+
* that call is propagated up on the specified scheduler
3727+
* <dl>
3728+
* <dt><b>Scheduler:</b></dt>
3729+
* <dd>{@code unsubscribeOn} calls dispose() of the upstream on the {@link Scheduler} you specify.</dd>
3730+
* </dl>
3731+
* @param scheduler the target scheduler where to execute the cancellation
3732+
* @return the new Maybe instance
3733+
* @throws NullPointerException if scheduler is null
3734+
*/
3735+
@SchedulerSupport(SchedulerSupport.CUSTOM)
3736+
public final Maybe<T> unsubscribeOn(final Scheduler scheduler) {
3737+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
3738+
return RxJavaPlugins.onAssembly(new MaybeUnsubscribeOn<T>(this, scheduler));
3739+
}
3740+
35353741
/**
35363742
* Waits until this and the other MaybeSource signal a success value then applies the given BiFunction
35373743
* to those values and emits the BiFunction's resulting value to downstream.

0 commit comments

Comments
 (0)