Skip to content

3.x: Introduce property rx3.scheduler.use-nanotime #7169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions src/main/java/io/reactivex/rxjava3/core/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@
* interface which can grant access to the original or hooked {@code Runnable}, thus, a repeated {@code RxJavaPlugins.onSchedule}
* can detect the earlier hook and not apply a new one over again.
* <p>
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Scheduler} implementations can override this
* The default implementation of {@link #now(TimeUnit)} and {@link Worker#now(TimeUnit)} methods to return current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx3.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Scheduler} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a {@code Scheduler} may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand All @@ -88,6 +89,34 @@
* All methods on the {@code Scheduler} and {@code Worker} classes should be thread safe.
*/
public abstract class Scheduler {
/**
* Value representing whether to use {@link System#nanoTime()}, or default as clock for {@link #now(TimeUnit)}
* and {@link Scheduler.Worker#now(TimeUnit)}
* <p>
* Associated system parameter:
* <ul>
* <li>{@code rx3.scheduler.use-nanotime}, boolean, default {@code false}
* </ul>
*/
static boolean IS_DRIFT_USE_NANOTIME = Boolean.getBoolean("rx3.scheduler.use-nanotime");

/**
* Returns the current clock time depending on state of {@link Scheduler#IS_DRIFT_USE_NANOTIME} in given {@code unit}
* <p>
* By default {@link System#currentTimeMillis()} will be used as the clock. When the property is set
* {@link System#nanoTime()} will be used.
* <p>
* @param unit the time unit
* @return the 'current time' in given unit
* @throws NullPointerException if {@code unit} is {@code null}
*/
static long computeNow(TimeUnit unit) {
if(!IS_DRIFT_USE_NANOTIME) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS);
}

/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
Expand Down Expand Up @@ -156,7 +185,7 @@ public static long clockDriftTolerance() {
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down Expand Up @@ -362,8 +391,9 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
* track the individual {@code Runnable} tasks while they are waiting to be executed (with or without delay) so that
* {@link #dispose()} can prevent their execution or potentially interrupt them if they are currently running.
* <p>
* The default implementation of the {@link #now(TimeUnit)} method returns current
* {@link System#currentTimeMillis()} value in the desired time unit. Custom {@code Worker} implementations can override this
* The default implementation of the {@link #now(TimeUnit)} method returns current {@link System#currentTimeMillis()}
* value in the desired time unit, unless {@code rx3.scheduler.use-nanotime} (boolean) is set. When the property is set to
* {@code true}, the method uses {@link System#nanoTime()} as its basis instead. Custom {@code Worker} implementations can override this
* to provide specialized time accounting (such as virtual time to be advanced programmatically).
* Note that operators requiring a scheduler may rely on either of the {@code now()} calls provided by
* {@code Scheduler} or {@code Worker} respectively, therefore, it is recommended they represent a logically
Expand Down Expand Up @@ -482,7 +512,7 @@ public Disposable schedulePeriodically(@NonNull Runnable run, final long initial
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
return computeNow(unit);
}

/**
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
* <li>{@code rx3.single-priority} (int): sets the thread priority of the {@link #single()} {@code Scheduler}, default is {@link Thread#NORM_PRIORITY}</li>
* <li>{@code rx3.purge-enabled} (boolean): enables periodic purging of all {@code Scheduler}'s backing thread pools, default is {@code false}</li>
* <li>{@code rx3.purge-period-seconds} (int): specifies the periodic purge interval of all {@code Scheduler}'s backing thread pools, default is 1 second</li>
* <li>{@code rx3.scheduler.use-nanotime} (boolean): {@code true} instructs {@code Scheduler} to use {@link System#nanoTime()} for {@link Scheduler#now(TimeUnit)},
* instead of default {@link System#currentTimeMillis()} ({@code false})</li>
* </ul>
*/
public final class Schedulers {
Expand Down
38 changes: 38 additions & 0 deletions src/test/java/io/reactivex/rxjava3/core/SchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,48 @@
package io.reactivex.rxjava3.core;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.junit.After;
import org.junit.Test;

import java.util.concurrent.TimeUnit;

public class SchedulerTest {
private static final String DRIFT_USE_NANOTIME = "rx3.scheduler.use-nanotime";

@After
public void cleanup() {
// reset value to default in order to not influence other tests
Scheduler.IS_DRIFT_USE_NANOTIME = false;
}

@Test
public void driftUseNanoTimeNotSetByDefault() {
assertFalse(Scheduler.IS_DRIFT_USE_NANOTIME);
assertFalse(Boolean.getBoolean(DRIFT_USE_NANOTIME));
}

@Test
public void computeNow_currentTimeMillis() {
TimeUnit unit = TimeUnit.MILLISECONDS;
assertTrue(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
}

@Test
public void computeNow_nanoTime() {
TimeUnit unit = TimeUnit.NANOSECONDS;
Scheduler.IS_DRIFT_USE_NANOTIME = true;

assertFalse(isInRange(System.currentTimeMillis(), Scheduler.computeNow(unit), unit, 250, TimeUnit.MILLISECONDS));
assertTrue(isInRange(System.nanoTime(), Scheduler.computeNow(unit), TimeUnit.NANOSECONDS, 250, TimeUnit.MILLISECONDS));
}

private boolean isInRange(long start, long stop, TimeUnit source, long maxDiff, TimeUnit diffUnit) {
long diff = Math.abs(stop - start);
return diffUnit.convert(diff, source) <= maxDiff;
}

@Test
public void clockDriftCalculation() {
Expand Down