Skip to content

2.x: Introduce property rx2.scheduler.use-nanotime (#7154) #7170

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@
* interface which can grant access to the original or hooked {@code Runnable}, thus, a repeated {@code RxJavaPlugins.onSchedule}
* can detect the earlier hook and not apply a new one over again.
* <p>
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Scheduler} implementations can override this
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx2.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Scheduler} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand All @@ -89,6 +90,34 @@
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
*/
public abstract class Scheduler {
/**
* Value representing whether to use {@link System#nanoTime()}, or default as clock for {@link #now(TimeUnit)}
* and {@link Scheduler.Worker#now(TimeUnit)}
* <p>
* Associated system parameter:
* <ul>
* <li>{@code rx2.scheduler.use-nanotime}, boolean, default {@code false}
* </ul>
*/
static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx2.scheduler.use-nanotime");

/**
* Returns the current clock time depending on state of {@link Scheduler#IS_DRIFT_USE_NANOTIME} in given {@code unit}
* <p>
* By default {@link System#currentTimeMillis()} will be used as the clock. When the property is set
* {@link System#nanoTime()} will be used.
* <p>
* @param unit the time unit
* @return the 'current time' in given unit
* @throws NullPointerException if {@code unit} is {@code null}
*/
static long computeNow(TimeUnit unit) {
if(!IS_DRIFT_USE_NANOTIME) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
}

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
Expand Down Expand Up @@ -131,7 +160,7 @@ public static long clockDriftTolerance() {
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down Expand Up @@ -332,8 +361,9 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
* track the individual {@code Runnable} tasks while they are waiting to be executed (with or without delay) so that
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
* <p>
* The default implementation of the {@link #now(TimeUnit)} method returns current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Worker} implementations can override this
* The default implementation of the {@link #now(TimeUnit)} method returns current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx2.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Worker} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand Down Expand Up @@ -448,7 +478,7 @@ public Disposable schedulePeriodically(@NonNull Runnable run, final long initial
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
* <li>{@code rx2.single-priority} (int): sets the thread priority of the {@link #single()} Scheduler, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx2.purge-enabled} (boolean): enables periodic purging of all Scheduler's backing thread pools, default is false</li>
* <li>{@code rx2.purge-period-seconds} (int): specifies the periodic purge interval of all Scheduler's backing thread pools, default is 1 second</li>
* <li>{@code rx2.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
*/
public final class Schedulers {
Expand Down
61 changes: 61 additions & 0 deletions src/test/java/io/reactivex/SchedulerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.*;

/**
* Same as {@link io.reactivex.schedulers.SchedulerTest}, but different package, to access
* package-private fields.
*/
public class SchedulerTest {
private static final String DRIFT_USE_NANOTIME = "rx2.scheduler.use-nanotime";

@After
public void cleanup() {
// reset value to default in order to not influence other tests
Scheduler.IS_DRIFT_USE_NANOTIME = false;
}

@Test
public void driftUseNanoTimeNotSetByDefault() {
assertFalse(Scheduler.IS_DRIFT_USE_NANOTIME);
assertFalse(Boolean.getBoolean(DRIFT_USE_NANOTIME));
}

@Test
public void computeNow_currentTimeMillis() {
TimeUnit unit = TimeUnit.MILLISECONDS;
assertTrue(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
}

@Test
public void computeNow_nanoTime() {
TimeUnit unit = TimeUnit.NANOSECONDS;
Scheduler.IS_DRIFT_USE_NANOTIME = true;

assertFalse(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
assertTrue(isInRange(System.nanoTime(), Scheduler.computeNow(unit), TimeUnit.NANOSECONDS, 250, TimeUnit.MILLISECONDS));
}

private boolean isInRange(long start, long stop, TimeUnit source, long maxDiff, TimeUnit diffUnit) {
long diff = Math.abs(stop - start);
return diffUnit.convert(diff, source) <= maxDiff;
}
}