-
Notifications
You must be signed in to change notification settings - Fork 7.6k
2.x: cleanup, bugfixes, coverage 8/27-2 #4434
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1743,16 +1743,10 @@ public static Observable<Long> interval(long initialDelay, long period, TimeUnit | |
*/ | ||
@SchedulerSupport(SchedulerSupport.CUSTOM) | ||
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) { | ||
if (initialDelay < 0) { | ||
initialDelay = 0L; | ||
} | ||
if (period < 0) { | ||
period = 0L; | ||
} | ||
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
|
||
return RxJavaPlugins.onAssembly(new ObservableInterval(initialDelay, period, unit, scheduler)); | ||
return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler)); | ||
} | ||
|
||
/** | ||
|
@@ -1843,17 +1837,10 @@ public static Observable<Long> intervalRange(long start, long count, long initia | |
if (end < 0) { | ||
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE"); | ||
} | ||
|
||
if (initialDelay < 0) { | ||
initialDelay = 0L; | ||
} | ||
if (period < 0) { | ||
period = 0L; | ||
} | ||
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
|
||
return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler)); | ||
return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler)); | ||
} | ||
|
||
/** | ||
|
@@ -3226,13 +3213,10 @@ public static Observable<Long> timer(long delay, TimeUnit unit) { | |
*/ | ||
@SchedulerSupport(SchedulerSupport.CUSTOM) | ||
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) { | ||
if (delay < 0) { | ||
delay = 0L; | ||
} | ||
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
|
||
return RxJavaPlugins.onAssembly(new ObservableTimer(delay, unit, scheduler)); | ||
return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler)); | ||
} | ||
|
||
/** | ||
|
@@ -4701,12 +4685,8 @@ public final Observable<List<T>> buffer(int count, int skip) { | |
*/ | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) { | ||
if (count <= 0) { | ||
throw new IllegalArgumentException("count > 0 required but it was " + count); | ||
} | ||
if (skip <= 0) { | ||
throw new IllegalArgumentException("skip > 0 required but it was " + count); | ||
} | ||
verifyPositive(count, "count"); | ||
verifyPositive(skip, "skip"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think one of these is wrong because There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Based on other methods in this diff I would guess There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It creates buffers every |
||
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); | ||
return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier)); | ||
} | ||
|
@@ -4965,9 +4945,7 @@ public final <U extends Collection<? super T>> Observable<U> buffer( | |
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null"); | ||
if (count <= 0) { | ||
throw new IllegalArgumentException("count > 0 required but it was " + count); | ||
} | ||
verifyPositive(count, "count"); | ||
return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize)); | ||
} | ||
|
||
|
@@ -6529,8 +6507,10 @@ public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscrib | |
*/ | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final Observable<T> doOnTerminate(final Action onTerminate) { | ||
return doOnEach(Functions.emptyConsumer(), Functions.actionConsumer(onTerminate), | ||
onTerminate, Functions.EMPTY_ACTION); | ||
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null"); | ||
return doOnEach(Functions.emptyConsumer(), | ||
Functions.actionConsumer(onTerminate), onTerminate, | ||
Functions.EMPTY_ACTION); | ||
} | ||
|
||
/** | ||
|
@@ -6769,9 +6749,7 @@ public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableS | |
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, | ||
boolean delayErrors, int maxConcurrency, int bufferSize) { | ||
ObjectHelper.requireNonNull(mapper, "mapper is null"); | ||
if (maxConcurrency <= 0) { | ||
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency); | ||
} | ||
verifyPositive(maxConcurrency, "maxConcurrency"); | ||
verifyPositive(bufferSize, "bufferSize"); | ||
if (this instanceof ScalarCallable) { | ||
@SuppressWarnings("unchecked") | ||
|
@@ -8255,7 +8233,7 @@ public final Observable<T> repeat() { | |
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd> | ||
* </dl> | ||
* | ||
* @param count | ||
* @param times | ||
* the number of times the source ObservableSource items are repeated, a count of 0 will yield an empty | ||
* sequence | ||
* @return a Observable that repeats the sequence of items emitted by the source ObservableSource at most | ||
|
@@ -8265,14 +8243,14 @@ public final Observable<T> repeat() { | |
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a> | ||
*/ | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final Observable<T> repeat(long count) { | ||
if (count < 0) { | ||
throw new IllegalArgumentException("count >= 0 required but it was " + count); | ||
public final Observable<T> repeat(long times) { | ||
if (times < 0) { | ||
throw new IllegalArgumentException("times >= 0 required but it was " + times); | ||
} | ||
if (count == 0) { | ||
if (times == 0) { | ||
return empty(); | ||
} | ||
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, count)); | ||
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times)); | ||
} | ||
|
||
/** | ||
|
@@ -8466,9 +8444,7 @@ public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends | |
*/ | ||
@SchedulerSupport(SchedulerSupport.CUSTOM) | ||
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { | ||
if (bufferSize < 0) { | ||
throw new IllegalArgumentException("bufferSize < 0"); | ||
} | ||
verifyPositive(bufferSize, "bufferSize"); | ||
ObjectHelper.requireNonNull(selector, "selector is null"); | ||
return ObservableReplay.multicastSelector( | ||
ObservableInternalHelper.replayCallable(this, bufferSize, time, unit, scheduler), selector); | ||
|
@@ -8680,9 +8656,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit | |
*/ | ||
@SchedulerSupport(SchedulerSupport.CUSTOM) | ||
public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) { | ||
if (bufferSize < 0) { | ||
throw new IllegalArgumentException("bufferSize < 0"); | ||
} | ||
verifyPositive(bufferSize, "bufferSize"); | ||
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
return ObservableReplay.create(this, time, unit, scheduler, bufferSize); | ||
|
@@ -8861,14 +8835,14 @@ public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> | |
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd> | ||
* </dl> | ||
* | ||
* @param count | ||
* @param times | ||
* number of retry attempts before failing | ||
* @return the source ObservableSource modified with retry logic | ||
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a> | ||
*/ | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final Observable<T> retry(long count) { | ||
return retry(count, Functions.alwaysTrue()); | ||
public final Observable<T> retry(long times) { | ||
return retry(times, Functions.alwaysTrue()); | ||
} | ||
|
||
/** | ||
|
@@ -10108,7 +10082,7 @@ public final <R> Observable<R> switchMapDelayError(Function<? super T, ? extends | |
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final Observable<T> take(long count) { | ||
if (count < 0) { | ||
throw new IllegalArgumentException("count >= required but it was " + count); | ||
throw new IllegalArgumentException("count >= 0 required but it was " + count); | ||
} | ||
return RxJavaPlugins.onAssembly(new ObservableTake<T>(this, count)); | ||
} | ||
|
@@ -11237,9 +11211,7 @@ public final Observable<List<T>> toList() { | |
*/ | ||
@SchedulerSupport(SchedulerSupport.NONE) | ||
public final Observable<List<T>> toList(final int capacityHint) { | ||
if (capacityHint <= 0) { | ||
throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint); | ||
} | ||
verifyPositive(capacityHint, "capacityHint"); | ||
return RxJavaPlugins.onAssembly(new ObservableToList<T, List<T>>(this, capacityHint)); | ||
} | ||
|
||
|
@@ -11501,14 +11473,12 @@ public final Flowable<T> toFlowable(BackpressureStrategy strategy) { | |
Flowable<T> o = new FlowableFromObservable<T>(this); | ||
|
||
switch (strategy) { | ||
case BUFFER: | ||
return o.onBackpressureBuffer(); | ||
case DROP: | ||
return o.onBackpressureDrop(); | ||
case LATEST: | ||
return o.onBackpressureLatest(); | ||
default: | ||
return o; | ||
return o.onBackpressureBuffer(); | ||
} | ||
} | ||
|
||
|
@@ -11821,6 +11791,8 @@ public final Observable<Observable<T>> window(long timespan, long timeskip, Time | |
*/ | ||
@SchedulerSupport(SchedulerSupport.CUSTOM) | ||
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) { | ||
verifyPositive(timespan, "timespan"); | ||
verifyPositive(timeskip, "timeskip"); | ||
verifyPositive(bufferSize, "bufferSize"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
|
@@ -12052,9 +12024,7 @@ public final Observable<Observable<T>> window( | |
verifyPositive(bufferSize, "bufferSize"); | ||
ObjectHelper.requireNonNull(scheduler, "scheduler is null"); | ||
ObjectHelper.requireNonNull(unit, "unit is null"); | ||
if (count <= 0) { | ||
throw new IllegalArgumentException("count > 0 required but it was " + count); | ||
} | ||
verifyPositive(count, "count"); | ||
return RxJavaPlugins.onAssembly(new ObservableWindowTimed<T>(this, timespan, timespan, unit, scheduler, count, bufferSize, restart)); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprised we aren't throwing here (and others)