Skip to content

2.x: cleanup, bugfixes, coverage 8/27-2 #4434

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,15 +225,7 @@ public static Completable unsafeCreate(CompletableSource source) {
if (source instanceof Completable) {
throw new IllegalArgumentException("Use of unsafeCreate(Completable)!");
}
try {
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
}

/**
Expand Down
86 changes: 28 additions & 58 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1743,16 +1743,10 @@ public static Observable<Long> interval(long initialDelay, long period, TimeUnit
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
if (initialDelay < 0) {
initialDelay = 0L;
}
if (period < 0) {
period = 0L;
}
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprised we aren't throwing here (and others)


return RxJavaPlugins.onAssembly(new ObservableInterval(initialDelay, period, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

/**
Expand Down Expand Up @@ -1843,17 +1837,10 @@ public static Observable<Long> intervalRange(long start, long count, long initia
if (end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}

if (initialDelay < 0) {
initialDelay = 0L;
}
if (period < 0) {
period = 0L;
}
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}

/**
Expand Down Expand Up @@ -3226,13 +3213,10 @@ public static Observable<Long> timer(long delay, TimeUnit unit) {
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
if (delay < 0) {
delay = 0L;
}
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");

return RxJavaPlugins.onAssembly(new ObservableTimer(delay, unit, scheduler));
return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
}

/**
Expand Down Expand Up @@ -4701,12 +4685,8 @@ public final Observable<List<T>> buffer(int count, int skip) {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
if (count <= 0) {
throw new IllegalArgumentException("count > 0 required but it was " + count);
}
if (skip <= 0) {
throw new IllegalArgumentException("skip > 0 required but it was " + count);
}
verifyPositive(count, "count");
verifyPositive(skip, "skip");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think one of these is wrong because skip can be 0 but count cannot.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on other methods in this diff I would guess verifyPositive rejects 0

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It creates buffers every index % skip so zero not allowed.

ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier));
}
Expand Down Expand Up @@ -4965,9 +4945,7 @@ public final <U extends Collection<? super T>> Observable<U> buffer(
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
if (count <= 0) {
throw new IllegalArgumentException("count > 0 required but it was " + count);
}
verifyPositive(count, "count");
return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize));
}

Expand Down Expand Up @@ -6529,8 +6507,10 @@ public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscrib
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnTerminate(final Action onTerminate) {
return doOnEach(Functions.emptyConsumer(), Functions.actionConsumer(onTerminate),
onTerminate, Functions.EMPTY_ACTION);
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
return doOnEach(Functions.emptyConsumer(),
Functions.actionConsumer(onTerminate), onTerminate,
Functions.EMPTY_ACTION);
}

/**
Expand Down Expand Up @@ -6769,9 +6749,7 @@ public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableS
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
if (maxConcurrency <= 0) {
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
}
verifyPositive(maxConcurrency, "maxConcurrency");
verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -8255,7 +8233,7 @@ public final Observable<T> repeat() {
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param count
* @param times
* the number of times the source ObservableSource items are repeated, a count of 0 will yield an empty
* sequence
* @return a Observable that repeats the sequence of items emitted by the source ObservableSource at most
Expand All @@ -8265,14 +8243,14 @@ public final Observable<T> repeat() {
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> repeat(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
public final Observable<T> repeat(long times) {
if (times < 0) {
throw new IllegalArgumentException("times >= 0 required but it was " + times);
}
if (count == 0) {
if (times == 0) {
return empty();
}
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, count));
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
}

/**
Expand Down Expand Up @@ -8466,9 +8444,7 @@ public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(selector, "selector is null");
return ObservableReplay.multicastSelector(
ObservableInternalHelper.replayCallable(this, bufferSize, time, unit, scheduler), selector);
Expand Down Expand Up @@ -8680,9 +8656,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
if (bufferSize < 0) {
throw new IllegalArgumentException("bufferSize < 0");
}
verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return ObservableReplay.create(this, time, unit, scheduler, bufferSize);
Expand Down Expand Up @@ -8861,14 +8835,14 @@ public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable>
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param count
* @param times
* number of retry attempts before failing
* @return the source ObservableSource modified with retry logic
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> retry(long count) {
return retry(count, Functions.alwaysTrue());
public final Observable<T> retry(long times) {
return retry(times, Functions.alwaysTrue());
}

/**
Expand Down Expand Up @@ -10108,7 +10082,7 @@ public final <R> Observable<R> switchMapDelayError(Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> take(long count) {
if (count < 0) {
throw new IllegalArgumentException("count >= required but it was " + count);
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}
return RxJavaPlugins.onAssembly(new ObservableTake<T>(this, count));
}
Expand Down Expand Up @@ -11237,9 +11211,7 @@ public final Observable<List<T>> toList() {
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<List<T>> toList(final int capacityHint) {
if (capacityHint <= 0) {
throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint);
}
verifyPositive(capacityHint, "capacityHint");
return RxJavaPlugins.onAssembly(new ObservableToList<T, List<T>>(this, capacityHint));
}

Expand Down Expand Up @@ -11501,14 +11473,12 @@ public final Flowable<T> toFlowable(BackpressureStrategy strategy) {
Flowable<T> o = new FlowableFromObservable<T>(this);

switch (strategy) {
case BUFFER:
return o.onBackpressureBuffer();
case DROP:
return o.onBackpressureDrop();
case LATEST:
return o.onBackpressureLatest();
default:
return o;
return o.onBackpressureBuffer();
}
}

Expand Down Expand Up @@ -11821,6 +11791,8 @@ public final Observable<Observable<T>> window(long timespan, long timeskip, Time
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) {
verifyPositive(timespan, "timespan");
verifyPositive(timeskip, "timeskip");
verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.requireNonNull(unit, "unit is null");
Expand Down Expand Up @@ -12052,9 +12024,7 @@ public final Observable<Observable<T>> window(
verifyPositive(bufferSize, "bufferSize");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.requireNonNull(unit, "unit is null");
if (count <= 0) {
throw new IllegalArgumentException("count > 0 required but it was " + count);
}
verifyPositive(count, "count");
return RxJavaPlugins.onAssembly(new ObservableWindowTimed<T>(this, timespan, timespan, unit, scheduler, count, bufferSize, restart));
}

Expand Down
90 changes: 56 additions & 34 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resource cleanup.
*/
public static abstract class Worker implements Disposable {

/**
* Schedules a Runnable for execution without delay.
*
Expand Down Expand Up @@ -236,38 +235,8 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

first.replace(schedule(new Runnable() {
long count;
long lastNowNanoseconds = firstNowNanoseconds;
long startInNanoseconds = firstStartInNanoseconds;
@Override
public void run() {
decoratedRun.run();

if (!sd.isDisposed()) {

long nextTick;

long nowNanoseconds = now(TimeUnit.NANOSECONDS);
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
} else {
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
}
lastNowNanoseconds = nowNanoseconds;

long delay = nextTick - nowNanoseconds;
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
}, initialDelay, unit));
first.replace(schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit));

return sd;
}
Expand All @@ -281,7 +250,60 @@ public void run() {
public long now(TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}


/**
* Holds state and logic to calculate when the next delayed invocation
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable {
final long firstStartInNanoseconds;
final Runnable decoratedRun;
final long firstNowNanoseconds;
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;

PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun,
long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) {
this.firstStartInNanoseconds = firstStartInNanoseconds;
this.decoratedRun = decoratedRun;
this.firstNowNanoseconds = firstNowNanoseconds;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
lastNowNanoseconds = firstNowNanoseconds;
startInNanoseconds = firstStartInNanoseconds;
}

@Override
public void run() {
decoratedRun.run();

if (!sd.isDisposed()) {

long nextTick;

long nowNanoseconds = now(TimeUnit.NANOSECONDS);
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
} else {
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
}
lastNowNanoseconds = nowNanoseconds;

long delay = nextTick - nowNanoseconds;
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
}
}

static class PeriodicDirectTask
Expand Down
14 changes: 2 additions & 12 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public static <T, U> Single<T> using(
* @param source the source to wrap
* @return the Single wrapper or the source cast to Single (if possible)
*/
static <T> Single<T> wrap(SingleSource<T> source) {
public static <T> Single<T> wrap(SingleSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Single) {
return RxJavaPlugins.onAssembly((Single<T>)source);
Expand Down Expand Up @@ -1824,7 +1824,7 @@ public final Single<T> onErrorReturn(final Function<Throwable, ? extends T> resu
* @return the new Single instance
* @since 2.0
*/
public final Single<T> onErrorReturnValue(final T value) {
public final Single<T> onErrorReturnItem(final T value) {
ObjectHelper.requireNonNull(value, "value is null");
return RxJavaPlugins.onAssembly(new SingleOnErrorReturn<T>(this, null, value));
}
Expand Down Expand Up @@ -2036,16 +2036,6 @@ public final Single<T> retryWhen(Function<? super Flowable<? extends Throwable>,
return toFlowable().retryWhen(handler).toSingle();
}

/**
* Subscribes the given Reactive-Streams Subscriber to this Single with a safety wrapper
* that handles exceptions thrown from the Subscriber's onXXX methods.
* @param s the Subscriber to wrap and subscribe to the current Single
* @since 2.0
*/
public final void safeSubscribe(Subscriber<? super T> s) {
toFlowable().safeSubscribe(s);
}

/**
* Subscribes to a Single but ignore its emission or notification.
* <dl>
Expand Down
Loading