Skip to content

Commit 666b05c

Browse files
authored
2.x: small cleanup and TCK fix (#4541)
1 parent c030483 commit 666b05c

File tree

13 files changed

+267
-348
lines changed

13 files changed

+267
-348
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -13017,7 +13017,7 @@ public final Flowable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)
1301713017
*
1301813018
* @param <V>
1301913019
* the timeout value type (ignored)
13020-
* @param timeoutSelector
13020+
* @param itemTimeoutIndicator
1302113021
* a function that returns a Publisher for each item emitted by the source
1302213022
* Publisher and that determines the timeout window for the subsequent item
1302313023
* @return a Flowable that mirrors the source Publisher, but notifies Subscribers of a
@@ -13027,8 +13027,8 @@ public final Flowable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)
1302713027
*/
1302813028
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
1302913029
@SchedulerSupport(SchedulerSupport.NONE)
13030-
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> timeoutSelector) {
13031-
return timeout0(null, timeoutSelector, null);
13030+
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator) {
13031+
return timeout0(null, itemTimeoutIndicator, null);
1303213032
}
1303313033

1303413034
/**
@@ -13052,7 +13052,7 @@ public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>>
1305213052
*
1305313053
* @param <V>
1305413054
* the timeout value type (ignored)
13055-
* @param timeoutSelector
13055+
* @param itemTimeoutIndicator
1305613056
* a function that returns a Publisher, for each item emitted by the source Publisher, that
1305713057
* determines the timeout window for the subsequent item
1305813058
* @param other
@@ -13064,9 +13064,9 @@ public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>>
1306413064
*/
1306513065
@BackpressureSupport(BackpressureKind.FULL)
1306613066
@SchedulerSupport(SchedulerSupport.NONE)
13067-
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> timeoutSelector, Flowable<? extends T> other) {
13067+
public final <V> Flowable<T> timeout(Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator, Flowable<? extends T> other) {
1306813068
ObjectHelper.requireNonNull(other, "other is null");
13069-
return timeout0(null, timeoutSelector, other);
13069+
return timeout0(null, itemTimeoutIndicator, other);
1307013070
}
1307113071

1307213072
/**
@@ -13149,17 +13149,17 @@ public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Flowable<? ext
1314913149
* maximum duration between items before a timeout occurs
1315013150
* @param timeUnit
1315113151
* the unit of time that applies to the {@code timeout} argument
13152-
* @param other
13153-
* the Publisher to use as the fallback in case of a timeout
1315413152
* @param scheduler
1315513153
* the {@link Scheduler} to run the timeout timers on
13154+
* @param other
13155+
* the Publisher to use as the fallback in case of a timeout
1315613156
* @return the source Publisher modified so that it will switch to the fallback Publisher in case of a
1315713157
* timeout
1315813158
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1315913159
*/
1316013160
@BackpressureSupport(BackpressureKind.FULL)
1316113161
@SchedulerSupport(SchedulerSupport.CUSTOM)
13162-
public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Flowable<? extends T> other, Scheduler scheduler) {
13162+
public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, Flowable<? extends T> other) {
1316313163
ObjectHelper.requireNonNull(other, "other is null");
1316413164
return timeout0(timeout, timeUnit, other, scheduler);
1316513165
}
@@ -13214,10 +13214,10 @@ public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sche
1321413214
* the first timeout value type (ignored)
1321513215
* @param <V>
1321613216
* the subsequent timeout value type (ignored)
13217-
* @param firstTimeoutSelector
13217+
* @param firstTimeoutIndicator
1321813218
* a function that returns a Publisher that determines the timeout window for the first source
1321913219
* item
13220-
* @param timeoutSelector
13220+
* @param itemTimeoutIndicator
1322113221
* a function that returns a Publisher for each item emitted by the source Publisher and that
1322213222
* determines the timeout window in which the subsequent source item must arrive in order to
1322313223
* continue the sequence
@@ -13228,10 +13228,10 @@ public final Flowable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sche
1322813228
*/
1322913229
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
1323013230
@SchedulerSupport(SchedulerSupport.NONE)
13231-
public final <U, V> Flowable<T> timeout(Callable<? extends Publisher<U>> firstTimeoutSelector,
13232-
Function<? super T, ? extends Publisher<V>> timeoutSelector) {
13233-
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
13234-
return timeout0(firstTimeoutSelector, timeoutSelector, null);
13231+
public final <U, V> Flowable<T> timeout(Publisher<U> firstTimeoutIndicator,
13232+
Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator) {
13233+
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutIndicator is null");
13234+
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, null);
1323513235
}
1323613236

1323713237
/**
@@ -13254,10 +13254,10 @@ public final <U, V> Flowable<T> timeout(Callable<? extends Publisher<U>> firstTi
1325413254
* the first timeout value type (ignored)
1325513255
* @param <V>
1325613256
* the subsequent timeout value type (ignored)
13257-
* @param firstTimeoutSelector
13257+
* @param firstTimeoutIndicator
1325813258
* a function that returns a Publisher which determines the timeout window for the first source
1325913259
* item
13260-
* @param timeoutSelector
13260+
* @param itemTimeoutIndicator
1326113261
* a function that returns a Publisher for each item emitted by the source Publisher and that
1326213262
* determines the timeout window in which the subsequent source item must arrive in order to
1326313263
* continue the sequence
@@ -13267,18 +13267,18 @@ public final <U, V> Flowable<T> timeout(Callable<? extends Publisher<U>> firstTi
1326713267
* either the first item emitted by the source Publisher or any subsequent item doesn't arrive
1326813268
* within time windows defined by the timeout selectors
1326913269
* @throws NullPointerException
13270-
* if {@code timeoutSelector} is null
13270+
* if {@code itemTimeoutIndicator} is null
1327113271
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1327213272
*/
1327313273
@BackpressureSupport(BackpressureKind.FULL)
1327413274
@SchedulerSupport(SchedulerSupport.NONE)
1327513275
public final <U, V> Flowable<T> timeout(
13276-
Callable<? extends Publisher<U>> firstTimeoutSelector,
13277-
Function<? super T, ? extends Publisher<V>> timeoutSelector,
13276+
Publisher<U> firstTimeoutIndicator,
13277+
Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator,
1327813278
Publisher<? extends T> other) {
13279-
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
13279+
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutSelector is null");
1328013280
ObjectHelper.requireNonNull(other, "other is null");
13281-
return timeout0(firstTimeoutSelector, timeoutSelector, other);
13281+
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, other);
1328213282
}
1328313283

1328413284
private Flowable<T> timeout0(long timeout, TimeUnit timeUnit, Flowable<? extends T> other,
@@ -13289,11 +13289,11 @@ private Flowable<T> timeout0(long timeout, TimeUnit timeUnit, Flowable<? extends
1328913289
}
1329013290

1329113291
private <U, V> Flowable<T> timeout0(
13292-
Callable<? extends Publisher<U>> firstTimeoutSelector,
13293-
Function<? super T, ? extends Publisher<V>> timeoutSelector,
13292+
Publisher<U> firstTimeoutIndicator,
13293+
Function<? super T, ? extends Publisher<V>> itemTimeoutIndicator,
1329413294
Publisher<? extends T> other) {
13295-
ObjectHelper.requireNonNull(timeoutSelector, "timeoutSelector is null");
13296-
return RxJavaPlugins.onAssembly(new FlowableTimeout<T, U, V>(this, firstTimeoutSelector, timeoutSelector, other));
13295+
ObjectHelper.requireNonNull(itemTimeoutIndicator, "itemTimeoutIndicator is null");
13296+
return RxJavaPlugins.onAssembly(new FlowableTimeout<T, U, V>(this, firstTimeoutIndicator, itemTimeoutIndicator, other));
1329713297
}
1329813298

1329913299
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10903,7 +10903,7 @@ public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler schedule
1090310903
*
1090410904
* @param <V>
1090510905
* the timeout value type (ignored)
10906-
* @param timeoutSelector
10906+
* @param itemTimeoutIndicator
1090710907
* a function that returns an ObservableSource for each item emitted by the source
1090810908
* ObservableSource and that determines the timeout window for the subsequent item
1090910909
* @return an Observable that mirrors the source ObservableSource, but notifies observers of a
@@ -10912,8 +10912,8 @@ public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler schedule
1091210912
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1091310913
*/
1091410914
@SchedulerSupport(SchedulerSupport.NONE)
10915-
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> timeoutSelector) {
10916-
return timeout0(null, timeoutSelector, null);
10915+
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator) {
10916+
return timeout0(null, itemTimeoutIndicator, null);
1091710917
}
1091810918

1091910919
/**
@@ -10932,7 +10932,7 @@ public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableS
1093210932
*
1093310933
* @param <V>
1093410934
* the timeout value type (ignored)
10935-
* @param timeoutSelector
10935+
* @param itemTimeoutIndicator
1093610936
* a function that returns an ObservableSource, for each item emitted by the source ObservableSource, that
1093710937
* determines the timeout window for the subsequent item
1093810938
* @param other
@@ -10943,10 +10943,10 @@ public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableS
1094310943
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1094410944
*/
1094510945
@SchedulerSupport(SchedulerSupport.NONE)
10946-
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> timeoutSelector,
10946+
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
1094710947
ObservableSource<? extends T> other) {
1094810948
ObjectHelper.requireNonNull(other, "other is null");
10949-
return timeout0(null, timeoutSelector, other);
10949+
return timeout0(null, itemTimeoutIndicator, other);
1095010950
}
1095110951

1095210952
/**
@@ -11014,16 +11014,16 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSo
1101411014
* maximum duration between items before a timeout occurs
1101511015
* @param timeUnit
1101611016
* the unit of time that applies to the {@code timeout} argument
11017-
* @param other
11018-
* the ObservableSource to use as the fallback in case of a timeout
1101911017
* @param scheduler
1102011018
* the {@link Scheduler} to run the timeout timers on
11019+
* @param other
11020+
* the ObservableSource to use as the fallback in case of a timeout
1102111021
* @return the source ObservableSource modified so that it will switch to the fallback ObservableSource in case of a
1102211022
* timeout
1102311023
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1102411024
*/
1102511025
@SchedulerSupport(SchedulerSupport.CUSTOM)
11026-
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other, Scheduler scheduler) {
11026+
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource<? extends T> other) {
1102711027
ObjectHelper.requireNonNull(other, "other is null");
1102811028
return timeout0(timeout, timeUnit, other, scheduler);
1102911029
}
@@ -11070,10 +11070,10 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
1107011070
* the first timeout value type (ignored)
1107111071
* @param <V>
1107211072
* the subsequent timeout value type (ignored)
11073-
* @param firstTimeoutSelector
11073+
* @param firstTimeoutIndicator
1107411074
* a function that returns an ObservableSource that determines the timeout window for the first source
1107511075
* item
11076-
* @param timeoutSelector
11076+
* @param itemTimeoutIndicator
1107711077
* a function that returns an ObservableSource for each item emitted by the source ObservableSource and that
1107811078
* determines the timeout window in which the subsequent source item must arrive in order to
1107911079
* continue the sequence
@@ -11083,10 +11083,10 @@ public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler sc
1108311083
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1108411084
*/
1108511085
@SchedulerSupport(SchedulerSupport.NONE)
11086-
public final <U, V> Observable<T> timeout(Callable<? extends ObservableSource<U>> firstTimeoutSelector,
11087-
Function<? super T, ? extends ObservableSource<V>> timeoutSelector) {
11088-
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
11089-
return timeout0(firstTimeoutSelector, timeoutSelector, null);
11086+
public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator,
11087+
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator) {
11088+
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutIndicator is null");
11089+
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, null);
1109011090
}
1109111091

1109211092
/**
@@ -11104,10 +11104,10 @@ public final <U, V> Observable<T> timeout(Callable<? extends ObservableSource<U>
1110411104
* the first timeout value type (ignored)
1110511105
* @param <V>
1110611106
* the subsequent timeout value type (ignored)
11107-
* @param firstTimeoutSelector
11107+
* @param firstTimeoutIndicator
1110811108
* a function that returns an ObservableSource which determines the timeout window for the first source
1110911109
* item
11110-
* @param timeoutSelector
11110+
* @param itemTimeoutIndicator
1111111111
* a function that returns an ObservableSource for each item emitted by the source ObservableSource and that
1111211112
* determines the timeout window in which the subsequent source item must arrive in order to
1111311113
* continue the sequence
@@ -11117,17 +11117,17 @@ public final <U, V> Observable<T> timeout(Callable<? extends ObservableSource<U>
1111711117
* either the first item emitted by the source ObservableSource or any subsequent item doesn't arrive
1111811118
* within time windows defined by the timeout selectors
1111911119
* @throws NullPointerException
11120-
* if {@code timeoutSelector} is null
11120+
* if {@code itemTimeoutIndicator} is null
1112111121
* @see <a href="http://reactivex.io/documentation/operators/timeout.html">ReactiveX operators documentation: Timeout</a>
1112211122
*/
1112311123
@SchedulerSupport(SchedulerSupport.NONE)
1112411124
public final <U, V> Observable<T> timeout(
11125-
Callable<? extends ObservableSource<U>> firstTimeoutSelector,
11126-
Function<? super T, ? extends ObservableSource<V>> timeoutSelector,
11125+
ObservableSource<U> firstTimeoutIndicator,
11126+
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
1112711127
ObservableSource<? extends T> other) {
11128-
ObjectHelper.requireNonNull(firstTimeoutSelector, "firstTimeoutSelector is null");
11128+
ObjectHelper.requireNonNull(firstTimeoutIndicator, "firstTimeoutIndicator is null");
1112911129
ObjectHelper.requireNonNull(other, "other is null");
11130-
return timeout0(firstTimeoutSelector, timeoutSelector, other);
11130+
return timeout0(firstTimeoutIndicator, itemTimeoutIndicator, other);
1113111131
}
1113211132

1113311133
private Observable<T> timeout0(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other,
@@ -11138,11 +11138,11 @@ private Observable<T> timeout0(long timeout, TimeUnit timeUnit, ObservableSource
1113811138
}
1113911139

1114011140
private <U, V> Observable<T> timeout0(
11141-
Callable<? extends ObservableSource<U>> firstTimeoutSelector,
11142-
Function<? super T, ? extends ObservableSource<V>> timeoutSelector,
11141+
ObservableSource<U> firstTimeoutIndicator,
11142+
Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator,
1114311143
ObservableSource<? extends T> other) {
11144-
ObjectHelper.requireNonNull(timeoutSelector, "timeoutSelector is null");
11145-
return RxJavaPlugins.onAssembly(new ObservableTimeout<T, U, V>(this, firstTimeoutSelector, timeoutSelector, other));
11144+
ObjectHelper.requireNonNull(itemTimeoutIndicator, "itemTimeoutIndicator is null");
11145+
return RxJavaPlugins.onAssembly(new ObservableTimeout<T, U, V>(this, firstTimeoutIndicator, itemTimeoutIndicator, other));
1114611146
}
1114711147

1114811148
/**

0 commit comments

Comments
 (0)