Skip to content

Commit 25e78c5

Browse files
authored
2.x: cleanup, bugfixes, coverage 8/27-2 (#4434)
1 parent 70d36fb commit 25e78c5

File tree

85 files changed

+3667
-166
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+3667
-166
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -225,15 +225,7 @@ public static Completable unsafeCreate(CompletableSource source) {
225225
if (source instanceof Completable) {
226226
throw new IllegalArgumentException("Use of unsafeCreate(Completable)!");
227227
}
228-
try {
229-
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
230-
} catch (NullPointerException ex) { // NOPMD
231-
throw ex;
232-
} catch (Throwable ex) {
233-
Exceptions.throwIfFatal(ex);
234-
RxJavaPlugins.onError(ex);
235-
throw toNpe(ex);
236-
}
228+
return RxJavaPlugins.onAssembly(new CompletableFromUnsafeSource(source));
237229
}
238230

239231
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 28 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1743,16 +1743,10 @@ public static Observable<Long> interval(long initialDelay, long period, TimeUnit
17431743
*/
17441744
@SchedulerSupport(SchedulerSupport.CUSTOM)
17451745
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
1746-
if (initialDelay < 0) {
1747-
initialDelay = 0L;
1748-
}
1749-
if (period < 0) {
1750-
period = 0L;
1751-
}
17521746
ObjectHelper.requireNonNull(unit, "unit is null");
17531747
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
17541748

1755-
return RxJavaPlugins.onAssembly(new ObservableInterval(initialDelay, period, unit, scheduler));
1749+
return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
17561750
}
17571751

17581752
/**
@@ -1843,17 +1837,10 @@ public static Observable<Long> intervalRange(long start, long count, long initia
18431837
if (end < 0) {
18441838
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
18451839
}
1846-
1847-
if (initialDelay < 0) {
1848-
initialDelay = 0L;
1849-
}
1850-
if (period < 0) {
1851-
period = 0L;
1852-
}
18531840
ObjectHelper.requireNonNull(unit, "unit is null");
18541841
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
18551842

1856-
return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, initialDelay, period, unit, scheduler));
1843+
return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
18571844
}
18581845

18591846
/**
@@ -3226,13 +3213,10 @@ public static Observable<Long> timer(long delay, TimeUnit unit) {
32263213
*/
32273214
@SchedulerSupport(SchedulerSupport.CUSTOM)
32283215
public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
3229-
if (delay < 0) {
3230-
delay = 0L;
3231-
}
32323216
ObjectHelper.requireNonNull(unit, "unit is null");
32333217
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
32343218

3235-
return RxJavaPlugins.onAssembly(new ObservableTimer(delay, unit, scheduler));
3219+
return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
32363220
}
32373221

32383222
/**
@@ -4701,12 +4685,8 @@ public final Observable<List<T>> buffer(int count, int skip) {
47014685
*/
47024686
@SchedulerSupport(SchedulerSupport.NONE)
47034687
public final <U extends Collection<? super T>> Observable<U> buffer(int count, int skip, Callable<U> bufferSupplier) {
4704-
if (count <= 0) {
4705-
throw new IllegalArgumentException("count > 0 required but it was " + count);
4706-
}
4707-
if (skip <= 0) {
4708-
throw new IllegalArgumentException("skip > 0 required but it was " + count);
4709-
}
4688+
verifyPositive(count, "count");
4689+
verifyPositive(skip, "skip");
47104690
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
47114691
return RxJavaPlugins.onAssembly(new ObservableBuffer<T, U>(this, count, skip, bufferSupplier));
47124692
}
@@ -4965,9 +4945,7 @@ public final <U extends Collection<? super T>> Observable<U> buffer(
49654945
ObjectHelper.requireNonNull(unit, "unit is null");
49664946
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
49674947
ObjectHelper.requireNonNull(bufferSupplier, "bufferSupplier is null");
4968-
if (count <= 0) {
4969-
throw new IllegalArgumentException("count > 0 required but it was " + count);
4970-
}
4948+
verifyPositive(count, "count");
49714949
return RxJavaPlugins.onAssembly(new ObservableBufferTimed<T, U>(this, timespan, timespan, unit, scheduler, bufferSupplier, count, restartTimerOnMaxSize));
49724950
}
49734951

@@ -6529,8 +6507,10 @@ public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscrib
65296507
*/
65306508
@SchedulerSupport(SchedulerSupport.NONE)
65316509
public final Observable<T> doOnTerminate(final Action onTerminate) {
6532-
return doOnEach(Functions.emptyConsumer(), Functions.actionConsumer(onTerminate),
6533-
onTerminate, Functions.EMPTY_ACTION);
6510+
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
6511+
return doOnEach(Functions.emptyConsumer(),
6512+
Functions.actionConsumer(onTerminate), onTerminate,
6513+
Functions.EMPTY_ACTION);
65346514
}
65356515

65366516
/**
@@ -6769,9 +6749,7 @@ public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableS
67696749
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
67706750
boolean delayErrors, int maxConcurrency, int bufferSize) {
67716751
ObjectHelper.requireNonNull(mapper, "mapper is null");
6772-
if (maxConcurrency <= 0) {
6773-
throw new IllegalArgumentException("maxConcurrency > 0 required but it was " + maxConcurrency);
6774-
}
6752+
verifyPositive(maxConcurrency, "maxConcurrency");
67756753
verifyPositive(bufferSize, "bufferSize");
67766754
if (this instanceof ScalarCallable) {
67776755
@SuppressWarnings("unchecked")
@@ -8255,7 +8233,7 @@ public final Observable<T> repeat() {
82558233
* <dd>{@code repeat} does not operate by default on a particular {@link Scheduler}.</dd>
82568234
* </dl>
82578235
*
8258-
* @param count
8236+
* @param times
82598237
* the number of times the source ObservableSource items are repeated, a count of 0 will yield an empty
82608238
* sequence
82618239
* @return a Observable that repeats the sequence of items emitted by the source ObservableSource at most
@@ -8265,14 +8243,14 @@ public final Observable<T> repeat() {
82658243
* @see <a href="http://reactivex.io/documentation/operators/repeat.html">ReactiveX operators documentation: Repeat</a>
82668244
*/
82678245
@SchedulerSupport(SchedulerSupport.NONE)
8268-
public final Observable<T> repeat(long count) {
8269-
if (count < 0) {
8270-
throw new IllegalArgumentException("count >= 0 required but it was " + count);
8246+
public final Observable<T> repeat(long times) {
8247+
if (times < 0) {
8248+
throw new IllegalArgumentException("times >= 0 required but it was " + times);
82718249
}
8272-
if (count == 0) {
8250+
if (times == 0) {
82738251
return empty();
82748252
}
8275-
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, count));
8253+
return RxJavaPlugins.onAssembly(new ObservableRepeat<T>(this, times));
82768254
}
82778255

82788256
/**
@@ -8466,9 +8444,7 @@ public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends
84668444
*/
84678445
@SchedulerSupport(SchedulerSupport.CUSTOM)
84688446
public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
8469-
if (bufferSize < 0) {
8470-
throw new IllegalArgumentException("bufferSize < 0");
8471-
}
8447+
verifyPositive(bufferSize, "bufferSize");
84728448
ObjectHelper.requireNonNull(selector, "selector is null");
84738449
return ObservableReplay.multicastSelector(
84748450
ObservableInternalHelper.replayCallable(this, bufferSize, time, unit, scheduler), selector);
@@ -8680,9 +8656,7 @@ public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit
86808656
*/
86818657
@SchedulerSupport(SchedulerSupport.CUSTOM)
86828658
public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
8683-
if (bufferSize < 0) {
8684-
throw new IllegalArgumentException("bufferSize < 0");
8685-
}
8659+
verifyPositive(bufferSize, "bufferSize");
86868660
ObjectHelper.requireNonNull(unit, "unit is null");
86878661
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
86888662
return ObservableReplay.create(this, time, unit, scheduler, bufferSize);
@@ -8861,14 +8835,14 @@ public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable>
88618835
* <dd>{@code retry} does not operate by default on a particular {@link Scheduler}.</dd>
88628836
* </dl>
88638837
*
8864-
* @param count
8838+
* @param times
88658839
* number of retry attempts before failing
88668840
* @return the source ObservableSource modified with retry logic
88678841
* @see <a href="http://reactivex.io/documentation/operators/retry.html">ReactiveX operators documentation: Retry</a>
88688842
*/
88698843
@SchedulerSupport(SchedulerSupport.NONE)
8870-
public final Observable<T> retry(long count) {
8871-
return retry(count, Functions.alwaysTrue());
8844+
public final Observable<T> retry(long times) {
8845+
return retry(times, Functions.alwaysTrue());
88728846
}
88738847

88748848
/**
@@ -10108,7 +10082,7 @@ public final <R> Observable<R> switchMapDelayError(Function<? super T, ? extends
1010810082
@SchedulerSupport(SchedulerSupport.NONE)
1010910083
public final Observable<T> take(long count) {
1011010084
if (count < 0) {
10111-
throw new IllegalArgumentException("count >= required but it was " + count);
10085+
throw new IllegalArgumentException("count >= 0 required but it was " + count);
1011210086
}
1011310087
return RxJavaPlugins.onAssembly(new ObservableTake<T>(this, count));
1011410088
}
@@ -11237,9 +11211,7 @@ public final Observable<List<T>> toList() {
1123711211
*/
1123811212
@SchedulerSupport(SchedulerSupport.NONE)
1123911213
public final Observable<List<T>> toList(final int capacityHint) {
11240-
if (capacityHint <= 0) {
11241-
throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint);
11242-
}
11214+
verifyPositive(capacityHint, "capacityHint");
1124311215
return RxJavaPlugins.onAssembly(new ObservableToList<T, List<T>>(this, capacityHint));
1124411216
}
1124511217

@@ -11501,14 +11473,12 @@ public final Flowable<T> toFlowable(BackpressureStrategy strategy) {
1150111473
Flowable<T> o = new FlowableFromObservable<T>(this);
1150211474

1150311475
switch (strategy) {
11504-
case BUFFER:
11505-
return o.onBackpressureBuffer();
1150611476
case DROP:
1150711477
return o.onBackpressureDrop();
1150811478
case LATEST:
1150911479
return o.onBackpressureLatest();
1151011480
default:
11511-
return o;
11481+
return o.onBackpressureBuffer();
1151211482
}
1151311483
}
1151411484

@@ -11821,6 +11791,8 @@ public final Observable<Observable<T>> window(long timespan, long timeskip, Time
1182111791
*/
1182211792
@SchedulerSupport(SchedulerSupport.CUSTOM)
1182311793
public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit, Scheduler scheduler, int bufferSize) {
11794+
verifyPositive(timespan, "timespan");
11795+
verifyPositive(timeskip, "timeskip");
1182411796
verifyPositive(bufferSize, "bufferSize");
1182511797
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
1182611798
ObjectHelper.requireNonNull(unit, "unit is null");
@@ -12052,9 +12024,7 @@ public final Observable<Observable<T>> window(
1205212024
verifyPositive(bufferSize, "bufferSize");
1205312025
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
1205412026
ObjectHelper.requireNonNull(unit, "unit is null");
12055-
if (count <= 0) {
12056-
throw new IllegalArgumentException("count > 0 required but it was " + count);
12057-
}
12027+
verifyPositive(count, "count");
1205812028
return RxJavaPlugins.onAssembly(new ObservableWindowTimed<T>(this, timespan, timespan, unit, scheduler, count, bufferSize, restart));
1205912029
}
1206012030

src/main/java/io/reactivex/Scheduler.java

Lines changed: 56 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, lo
174174
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resource cleanup.
175175
*/
176176
public static abstract class Worker implements Disposable {
177-
178177
/**
179178
* Schedules a Runnable for execution without delay.
180179
*
@@ -236,38 +235,8 @@ public Disposable schedulePeriodically(Runnable run, final long initialDelay, fi
236235
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
237236
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
238237

239-
first.replace(schedule(new Runnable() {
240-
long count;
241-
long lastNowNanoseconds = firstNowNanoseconds;
242-
long startInNanoseconds = firstStartInNanoseconds;
243-
@Override
244-
public void run() {
245-
decoratedRun.run();
246-
247-
if (!sd.isDisposed()) {
248-
249-
long nextTick;
250-
251-
long nowNanoseconds = now(TimeUnit.NANOSECONDS);
252-
// If the clock moved in a direction quite a bit, rebase the repetition period
253-
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
254-
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
255-
nextTick = nowNanoseconds + periodInNanoseconds;
256-
/*
257-
* Shift the start point back by the drift as if the whole thing
258-
* started count periods ago.
259-
*/
260-
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
261-
} else {
262-
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
263-
}
264-
lastNowNanoseconds = nowNanoseconds;
265-
266-
long delay = nextTick - nowNanoseconds;
267-
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
268-
}
269-
}
270-
}, initialDelay, unit));
238+
first.replace(schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
239+
periodInNanoseconds), initialDelay, unit));
271240

272241
return sd;
273242
}
@@ -281,7 +250,60 @@ public void run() {
281250
public long now(TimeUnit unit) {
282251
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
283252
}
284-
253+
254+
/**
255+
* Holds state and logic to calculate when the next delayed invocation
256+
* of this task has to happen (accounting for clock drifts).
257+
*/
258+
final class PeriodicTask implements Runnable {
259+
final long firstStartInNanoseconds;
260+
final Runnable decoratedRun;
261+
final long firstNowNanoseconds;
262+
final SequentialDisposable sd;
263+
final long periodInNanoseconds;
264+
long count;
265+
long lastNowNanoseconds;
266+
long startInNanoseconds;
267+
268+
PeriodicTask(long firstStartInNanoseconds, Runnable decoratedRun,
269+
long firstNowNanoseconds, SequentialDisposable sd, long periodInNanoseconds) {
270+
this.firstStartInNanoseconds = firstStartInNanoseconds;
271+
this.decoratedRun = decoratedRun;
272+
this.firstNowNanoseconds = firstNowNanoseconds;
273+
this.sd = sd;
274+
this.periodInNanoseconds = periodInNanoseconds;
275+
lastNowNanoseconds = firstNowNanoseconds;
276+
startInNanoseconds = firstStartInNanoseconds;
277+
}
278+
279+
@Override
280+
public void run() {
281+
decoratedRun.run();
282+
283+
if (!sd.isDisposed()) {
284+
285+
long nextTick;
286+
287+
long nowNanoseconds = now(TimeUnit.NANOSECONDS);
288+
// If the clock moved in a direction quite a bit, rebase the repetition period
289+
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
290+
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
291+
nextTick = nowNanoseconds + periodInNanoseconds;
292+
/*
293+
* Shift the start point back by the drift as if the whole thing
294+
* started count periods ago.
295+
*/
296+
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
297+
} else {
298+
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
299+
}
300+
lastNowNanoseconds = nowNanoseconds;
301+
302+
long delay = nextTick - nowNanoseconds;
303+
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
304+
}
305+
}
306+
}
285307
}
286308

287309
static class PeriodicDirectTask

src/main/java/io/reactivex/Single.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ public static <T, U> Single<T> using(
801801
* @param source the source to wrap
802802
* @return the Single wrapper or the source cast to Single (if possible)
803803
*/
804-
static <T> Single<T> wrap(SingleSource<T> source) {
804+
public static <T> Single<T> wrap(SingleSource<T> source) {
805805
ObjectHelper.requireNonNull(source, "source is null");
806806
if (source instanceof Single) {
807807
return RxJavaPlugins.onAssembly((Single<T>)source);
@@ -1824,7 +1824,7 @@ public final Single<T> onErrorReturn(final Function<Throwable, ? extends T> resu
18241824
* @return the new Single instance
18251825
* @since 2.0
18261826
*/
1827-
public final Single<T> onErrorReturnValue(final T value) {
1827+
public final Single<T> onErrorReturnItem(final T value) {
18281828
ObjectHelper.requireNonNull(value, "value is null");
18291829
return RxJavaPlugins.onAssembly(new SingleOnErrorReturn<T>(this, null, value));
18301830
}
@@ -2036,16 +2036,6 @@ public final Single<T> retryWhen(Function<? super Flowable<? extends Throwable>,
20362036
return toFlowable().retryWhen(handler).toSingle();
20372037
}
20382038

2039-
/**
2040-
* Subscribes the given Reactive-Streams Subscriber to this Single with a safety wrapper
2041-
* that handles exceptions thrown from the Subscriber's onXXX methods.
2042-
* @param s the Subscriber to wrap and subscribe to the current Single
2043-
* @since 2.0
2044-
*/
2045-
public final void safeSubscribe(Subscriber<? super T> s) {
2046-
toFlowable().safeSubscribe(s);
2047-
}
2048-
20492039
/**
20502040
* Subscribes to a Single but ignore its emission or notification.
20512041
* <dl>

0 commit comments

Comments
 (0)