Skip to content

2.x: small cleanup and TCK fix #4541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 13, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 26 additions & 26 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13017,7 +13017,7 @@ public final Flowable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)
*
* @param <V>
* the timeout value type (ignored)
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns a Publisher for each item emitted by the source
* Publisher and that determines the timeout window for the subsequent item
* @return a Flowable that mirrors the source Publisher, but notifies Subscribers of a
Expand All @@ -13027,8 +13027,8 @@ public final Flowable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)
*/
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> timeoutSelector) {
return timeout0(null, timeoutSelector, null);
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator) {
return timeout0(null, itemTimeoutIndicator, null);
}

/**
Expand All @@ -13052,7 +13052,7 @@ public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>>
*
* @param <V>
* the timeout value type (ignored)
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns a Publisher, for each item emitted by the source Publisher, that
* determines the timeout window for the subsequent item
* @param other
Expand All @@ -13064,9 +13064,9 @@ public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> timeoutSelector, Flowable<? extends T> other) {
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator, Flowable<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(null, timeoutSelector, other);
return timeout0(null, itemTimeoutIndicator, other);
}

/**
Expand Down Expand Up @@ -13149,17 +13149,17 @@ public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Flowable<? ext
* maximum duration between items before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param other
* the Publisher to use as the fallback in case of a timeout
* @param scheduler
* the {@link Scheduler} to run the timeout timers on
* @param other
* the Publisher to use as the fallback in case of a timeout
* @return the source Publisher modified so that it will switch to the fallback Publisher in case of a
* timeout
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Flowable<? extends T> other, Scheduler scheduler) {
public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, Flowable<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(timeout, timeUnit, other, scheduler);
}
Expand Down Expand Up @@ -13214,10 +13214,10 @@ public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sche
* the first timeout value type (ignored)
* @param <V>
* the subsequent timeout value type (ignored)
* @param firstTimeoutSelector
* @param firstTimeoutIndicator
* a function that returns a Publisher that determines the timeout window for the first source
* item
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns a Publisher for each item emitted by the source Publisher and that
* determines the timeout window in which the subsequent source item must arrive in order to
* continue the sequence
Expand All @@ -13228,10 +13228,10 @@ public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sche
*/
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, V> Flowable<T> timeout(Callable<? extends Publisher<U>> firstTimeoutSelector,
Function<? super T, ? extends Publisher<V>> timeoutSelector) {
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
return timeout0(firstTimeoutSelector, timeoutSelector, null);
public final <U, V> Flowable<T> timeout(Publisher<U> firstTimeoutIndicator,
Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator) {
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutIndicator is null");
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, null);
}

/**
Expand All @@ -13254,10 +13254,10 @@ public final <U, V> Flowable<T> timeout(Callable<? extends Publisher<U>> firstTi
* the first timeout value type (ignored)
* @param <V>
* the subsequent timeout value type (ignored)
* @param firstTimeoutSelector
* @param firstTimeoutIndicator
* a function that returns a Publisher which determines the timeout window for the first source
* item
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns a Publisher for each item emitted by the source Publisher and that
* determines the timeout window in which the subsequent source item must arrive in order to
* continue the sequence
Expand All @@ -13267,18 +13267,18 @@ public final <U, V> Flowable<T> timeout(Callable<? extends Publisher<U>> firstTi
* either the first item emitted by the source Publisher or any subsequent item doesn't arrive
* within time windows defined by the timeout selectors
* @throws NullPointerException
* if {@code timeoutSelector} is null
* if {@code itemTimeoutIndicator} is null
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, V> Flowable<T> timeout(
Callable<? extends Publisher<U>> firstTimeoutSelector,
Function<? super T, ? extends Publisher<V>> timeoutSelector,
Publisher<U> firstTimeoutIndicator,
Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator,
Publisher<? extends T> other) {
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutSelector is null");
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(firstTimeoutSelector, timeoutSelector, other);
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, other);
}

private Flowable<T> timeout0(long timeout, TimeUnit timeUnit, Flowable<? extends T> other,
Expand All @@ -13289,11 +13289,11 @@ private Flowable<T> timeout0(long timeout, TimeUnit timeUnit, Flowable<? extends
}

private <U, V> Flowable<T> timeout0(
Callable<? extends Publisher<U>> firstTimeoutSelector,
Function<? super T, ? extends Publisher<V>> timeoutSelector,
Publisher<U> firstTimeoutIndicator,
Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator,
Publisher<? extends T> other) {
ObjectHelper.requireNonNull(timeoutSelector, "timeoutSelector is null");
return RxJavaPlugins.onAssembly(new FlowableTimeout<T, U, V>(this, firstTimeoutSelector, timeoutSelector, other));
ObjectHelper.requireNonNull(itemTimeoutIndicator, "itemTimeoutIndicator is null");
return RxJavaPlugins.onAssembly(new FlowableTimeout<T, U, V>(this, firstTimeoutIndicator, itemTimeoutIndicator, other));
}

/**
Expand Down
52 changes: 26 additions & 26 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10903,7 +10903,7 @@ public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler schedule
*
* @param <V>
* the timeout value type (ignored)
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns an ObservableSource for each item emitted by the source
* ObservableSource and that determines the timeout window for the subsequent item
* @return an Observable that mirrors the source ObservableSource, but notifies observers of a
Expand All @@ -10912,8 +10912,8 @@ public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler schedule
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> timeoutSelector) {
return timeout0(null, timeoutSelector, null);
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator) {
return timeout0(null, itemTimeoutIndicator, null);
}

/**
Expand All @@ -10932,7 +10932,7 @@ public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableS
*
* @param <V>
* the timeout value type (ignored)
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns an ObservableSource, for each item emitted by the source ObservableSource, that
* determines the timeout window for the subsequent item
* @param other
Expand All @@ -10943,10 +10943,10 @@ public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableS
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> timeoutSelector,
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
ObservableSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(null, timeoutSelector, other);
return timeout0(null, itemTimeoutIndicator, other);
}

/**
Expand Down Expand Up @@ -11014,16 +11014,16 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSo
* maximum duration between items before a timeout occurs
* @param timeUnit
* the unit of time that applies to the {@code timeout} argument
* @param other
* the ObservableSource to use as the fallback in case of a timeout
* @param scheduler
* the {@link Scheduler} to run the timeout timers on
* @param other
* the ObservableSource to use as the fallback in case of a timeout
* @return the source ObservableSource modified so that it will switch to the fallback ObservableSource in case of a
* timeout
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other, Scheduler scheduler) {
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(timeout, timeUnit, other, scheduler);
}
Expand Down Expand Up @@ -11070,10 +11070,10 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
* the first timeout value type (ignored)
* @param <V>
* the subsequent timeout value type (ignored)
* @param firstTimeoutSelector
* @param firstTimeoutIndicator
* a function that returns an ObservableSource that determines the timeout window for the first source
* item
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns an ObservableSource for each item emitted by the source ObservableSource and that
* determines the timeout window in which the subsequent source item must arrive in order to
* continue the sequence
Expand All @@ -11083,10 +11083,10 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, V> Observable<T> timeout(Callable<? extends ObservableSource<U>> firstTimeoutSelector,
Function<? super T, ? extends ObservableSource<V>> timeoutSelector) {
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
return timeout0(firstTimeoutSelector, timeoutSelector, null);
public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator,
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator) {
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutIndicator is null");
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, null);
}

/**
Expand All @@ -11104,10 +11104,10 @@ public final <U, V> Observable<T> timeout(Callable<? extends ObservableSource<U>
* the first timeout value type (ignored)
* @param <V>
* the subsequent timeout value type (ignored)
* @param firstTimeoutSelector
* @param firstTimeoutIndicator
* a function that returns an ObservableSource which determines the timeout window for the first source
* item
* @param timeoutSelector
* @param itemTimeoutIndicator
* a function that returns an ObservableSource for each item emitted by the source ObservableSource and that
* determines the timeout window in which the subsequent source item must arrive in order to
* continue the sequence
Expand All @@ -11117,17 +11117,17 @@ public final <U, V> Observable<T> timeout(Callable<? extends ObservableSource<U>
* either the first item emitted by the source ObservableSource or any subsequent item doesn't arrive
* within time windows defined by the timeout selectors
* @throws NullPointerException
* if {@code timeoutSelector} is null
* if {@code itemTimeoutIndicator} is null
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <U, V> Observable<T> timeout(
Callable<? extends ObservableSource<U>> firstTimeoutSelector,
Function<? super T, ? extends ObservableSource<V>> timeoutSelector,
ObservableSource<U> firstTimeoutIndicator,
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
ObservableSource<? extends T> other) {
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutIndicator is null");
ObjectHelper.requireNonNull(other, "other is null");
return timeout0(firstTimeoutSelector, timeoutSelector, other);
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, other);
}

private Observable<T> timeout0(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other,
Expand All @@ -11138,11 +11138,11 @@ private Observable<T> timeout0(long timeout, TimeUnit timeUnit, ObservableSource
}

private <U, V> Observable<T> timeout0(
Callable<? extends ObservableSource<U>> firstTimeoutSelector,
Function<? super T, ? extends ObservableSource<V>> timeoutSelector,
ObservableSource<U> firstTimeoutIndicator,
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
ObservableSource<? extends T> other) {
ObjectHelper.requireNonNull(timeoutSelector, "timeoutSelector is null");
return RxJavaPlugins.onAssembly(new ObservableTimeout<T, U, V>(this, firstTimeoutSelector, timeoutSelector, other));
ObjectHelper.requireNonNull(itemTimeoutIndicator, "itemTimeoutIndicator is null");
return RxJavaPlugins.onAssembly(new ObservableTimeout<T, U, V>(this, firstTimeoutIndicator, itemTimeoutIndicator, other));
}

/**
Expand Down
Loading