Skip to content

Commit 1a745a1

Browse files
authored
3.x: TestScheduler option to use onSchedule hook (#7163)
1 parent 80a4842 commit 1a745a1

File tree

3 files changed

+160
-11
lines changed

3 files changed

+160
-11
lines changed

src/main/java/io/reactivex/rxjava3/schedulers/Schedulers.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,8 @@ public static Scheduler computation() {
180180
* @{@link io.reactivex.rxjava3.annotations.SchedulerSupport SchedulerSupport}({@link io.reactivex.rxjava3.annotations.SchedulerSupport#IO IO})
181181
* annotation.
182182
* <p>
183-
* When the {@link Scheduler.Worker} is disposed, the underlying worker can be released to the cached worker pool in two modes:
183+
* When the {@link io.reactivex.rxjava3.core.Scheduler.Worker Scheduler.Worker} is disposed,
184+
* the underlying worker can be released to the cached worker pool in two modes:
184185
* <ul>
185186
* <li>In <em>eager</em> mode (default), the underlying worker is returned immediately to the cached worker pool
186187
* and can be reused much quicker by operators. The drawback is that if the currently running task doesn't

src/main/java/io/reactivex/rxjava3/schedulers/TestScheduler.java

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,28 @@
1515

1616
import java.util.Queue;
1717
import java.util.concurrent.*;
18+
import java.util.concurrent.atomic.AtomicReference;
1819

19-
import io.reactivex.rxjava3.annotations.NonNull;
20+
import io.reactivex.rxjava3.annotations.*;
2021
import io.reactivex.rxjava3.core.Scheduler;
21-
import io.reactivex.rxjava3.disposables.*;
22+
import io.reactivex.rxjava3.disposables.Disposable;
2223
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
24+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2325

2426
/**
2527
* A special, non thread-safe scheduler for testing operators that require
2628
* a scheduler without introducing real concurrency and allows manually advancing
2729
* a virtual time.
30+
* <p>
31+
* By default, the tasks submitted via the various {@code schedule} methods are not
32+
* wrapped by the {@link RxJavaPlugins#onSchedule(Runnable)} hook. To enable this behavior,
33+
* create a {@code TestScheduler} via {@link #TestScheduler(boolean)} or {@link #TestScheduler(long, TimeUnit, boolean)}.
2834
*/
2935
public final class TestScheduler extends Scheduler {
3036
/** The ordered queue for the runnable tasks. */
3137
final Queue<TimedRunnable> queue = new PriorityBlockingQueue<>(11);
38+
/** Use the {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks. */
39+
final boolean useOnScheduleHook;
3240
/** The per-scheduler global order counter. */
3341
long counter;
3442
// Storing time in nanoseconds internally.
@@ -38,7 +46,20 @@ public final class TestScheduler extends Scheduler {
3846
* Creates a new TestScheduler with initial virtual time of zero.
3947
*/
4048
public TestScheduler() {
41-
// No-op.
49+
this(false);
50+
}
51+
52+
/**
53+
* Creates a new TestScheduler with the option to use the
54+
* {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
55+
* @param useOnScheduleHook if {@code true}, the tasks submitted to this
56+
* TestScheduler is wrapped via the
57+
* {@link RxJavaPlugins#onSchedule(Runnable)} hook
58+
* @since 3.0.10 - experimental
59+
*/
60+
@Experimental
61+
public TestScheduler(boolean useOnScheduleHook) {
62+
this.useOnScheduleHook = useOnScheduleHook;
4263
}
4364

4465
/**
@@ -50,7 +71,27 @@ public TestScheduler() {
5071
* the units of time that {@code delayTime} is expressed in
5172
*/
5273
public TestScheduler(long delayTime, TimeUnit unit) {
74+
this(delayTime, unit, false);
75+
}
76+
77+
/**
78+
* Creates a new TestScheduler with the specified initial virtual time
79+
* and with the option to use the
80+
* {@link RxJavaPlugins#onSchedule(Runnable)} hook when scheduling tasks.
81+
*
82+
* @param delayTime
83+
* the point in time to move the Scheduler's clock to
84+
* @param unit
85+
* the units of time that {@code delayTime} is expressed in
86+
* @param useOnScheduleHook if {@code true}, the tasks submitted to this
87+
* TestScheduler is wrapped via the
88+
* {@link RxJavaPlugins#onSchedule(Runnable)} hook
89+
* @since 3.0.10 - experimental
90+
*/
91+
@Experimental
92+
public TestScheduler(long delayTime, TimeUnit unit, boolean useOnScheduleHook) {
5393
time = unit.toNanos(delayTime);
94+
this.useOnScheduleHook = useOnScheduleHook;
5495
}
5596

5697
static final class TimedRunnable implements Comparable<TimedRunnable> {
@@ -163,10 +204,13 @@ public Disposable schedule(@NonNull Runnable run, long delayTime, @NonNull TimeU
163204
if (disposed) {
164205
return EmptyDisposable.INSTANCE;
165206
}
207+
if (useOnScheduleHook) {
208+
run = RxJavaPlugins.onSchedule(run);
209+
}
166210
final TimedRunnable timedAction = new TimedRunnable(this, time + unit.toNanos(delayTime), run, counter++);
167211
queue.add(timedAction);
168212

169-
return Disposable.fromRunnable(new QueueRemove(timedAction));
213+
return new QueueRemove(timedAction);
170214
}
171215

172216
@NonNull
@@ -175,26 +219,38 @@ public Disposable schedule(@NonNull Runnable run) {
175219
if (disposed) {
176220
return EmptyDisposable.INSTANCE;
177221
}
222+
if (useOnScheduleHook) {
223+
run = RxJavaPlugins.onSchedule(run);
224+
}
178225
final TimedRunnable timedAction = new TimedRunnable(this, 0, run, counter++);
179226
queue.add(timedAction);
180-
return Disposable.fromRunnable(new QueueRemove(timedAction));
227+
return new QueueRemove(timedAction);
181228
}
182229

183230
@Override
184231
public long now(@NonNull TimeUnit unit) {
185232
return TestScheduler.this.now(unit);
186233
}
187234

188-
final class QueueRemove implements Runnable {
189-
final TimedRunnable timedAction;
235+
final class QueueRemove extends AtomicReference<TimedRunnable> implements Disposable {
236+
237+
private static final long serialVersionUID = -7874968252110604360L;
190238

191239
QueueRemove(TimedRunnable timedAction) {
192-
this.timedAction = timedAction;
240+
this.lazySet(timedAction);
241+
}
242+
243+
@Override
244+
public void dispose() {
245+
TimedRunnable tr = getAndSet(null);
246+
if (tr != null) {
247+
queue.remove(tr);
248+
}
193249
}
194250

195251
@Override
196-
public void run() {
197-
queue.remove(timedAction);
252+
public boolean isDisposed() {
253+
return get() == null;
198254
}
199255
}
200256
}

src/test/java/io/reactivex/rxjava3/schedulers/TestSchedulerTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.rxjava3.functions.Function;
3131
import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription;
3232
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
33+
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3334
import io.reactivex.rxjava3.schedulers.TestScheduler.*;
3435

3536
public class TestSchedulerTest extends RxJavaTest {
@@ -260,4 +261,95 @@ public void constructorTimeSetsTime() {
260261
assertEquals(5, ts.now(TimeUnit.SECONDS));
261262
assertEquals(5000, ts.now(TimeUnit.MILLISECONDS));
262263
}
264+
265+
@Test
266+
public void withOnScheduleHook() {
267+
AtomicInteger run = new AtomicInteger();
268+
AtomicInteger counter = new AtomicInteger();
269+
RxJavaPlugins.setScheduleHandler(r -> {
270+
counter.getAndIncrement();
271+
return r;
272+
});
273+
try {
274+
Runnable r = () -> run.getAndIncrement();
275+
TestScheduler ts = new TestScheduler(true);
276+
277+
ts.createWorker().schedule(r);
278+
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);
279+
280+
ts.advanceTimeBy(1, TimeUnit.SECONDS);
281+
282+
assertEquals(2, run.get());
283+
assertEquals(2, counter.get());
284+
285+
ts = new TestScheduler();
286+
287+
ts.createWorker().schedule(r);
288+
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);
289+
290+
ts.advanceTimeBy(1, TimeUnit.SECONDS);
291+
292+
assertEquals(4, run.get());
293+
assertEquals(2, counter.get());
294+
} finally {
295+
RxJavaPlugins.setScheduleHandler(null);
296+
}
297+
}
298+
299+
@Test
300+
public void withOnScheduleHookInitialTime() {
301+
AtomicInteger run = new AtomicInteger();
302+
AtomicInteger counter = new AtomicInteger();
303+
RxJavaPlugins.setScheduleHandler(r -> {
304+
counter.getAndIncrement();
305+
return r;
306+
});
307+
try {
308+
Runnable r = () -> run.getAndIncrement();
309+
TestScheduler ts = new TestScheduler(1, TimeUnit.HOURS, true);
310+
311+
ts.createWorker().schedule(r);
312+
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);
313+
314+
ts.advanceTimeBy(1, TimeUnit.SECONDS);
315+
316+
assertEquals(2, run.get());
317+
assertEquals(2, counter.get());
318+
319+
ts = new TestScheduler(1, TimeUnit.HOURS);
320+
321+
ts.createWorker().schedule(r);
322+
ts.createWorker().schedule(r, 1, TimeUnit.SECONDS);
323+
324+
ts.advanceTimeBy(1, TimeUnit.SECONDS);
325+
326+
assertEquals(4, run.get());
327+
assertEquals(2, counter.get());
328+
} finally {
329+
RxJavaPlugins.setScheduleHandler(null);
330+
}
331+
}
332+
333+
@Test
334+
public void disposeWork() {
335+
AtomicInteger run = new AtomicInteger();
336+
Runnable r = () -> run.getAndIncrement();
337+
TestScheduler ts = new TestScheduler(1, TimeUnit.HOURS, true);
338+
339+
Disposable d = ts.createWorker().schedule(r);
340+
341+
assertFalse(d.isDisposed());
342+
343+
d.dispose();
344+
345+
assertTrue(d.isDisposed());
346+
347+
d.dispose();
348+
349+
assertTrue(d.isDisposed());
350+
351+
ts.advanceTimeBy(1, TimeUnit.SECONDS);
352+
353+
assertEquals(0, run.get());
354+
}
263355
}

0 commit comments

Comments
 (0)