Skip to content

Commit b7086ef

Browse files
authored
2.x: fix periodic scheduling with negative period causing IAE (#5419)
1 parent f64fdd9 commit b7086ef

File tree

5 files changed

+279
-3
lines changed

5 files changed

+279
-3
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.internal.schedulers;
18+
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.internal.functions.Functions;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
26+
/**
27+
* Wrapper for a regular task that gets immediately rescheduled when the task completed.
28+
*/
29+
final class InstantPeriodicTask implements Callable<Void>, Disposable {
30+
31+
final Runnable task;
32+
33+
final AtomicReference<Future<?>> rest;
34+
35+
final AtomicReference<Future<?>> first;
36+
37+
final ExecutorService executor;
38+
39+
Thread runner;
40+
41+
static final FutureTask<Void> CANCELLED = new FutureTask<Void>(Functions.EMPTY_RUNNABLE, null);
42+
43+
InstantPeriodicTask(Runnable task, ExecutorService executor) {
44+
super();
45+
this.task = task;
46+
this.first = new AtomicReference<Future<?>>();
47+
this.rest = new AtomicReference<Future<?>>();
48+
this.executor = executor;
49+
}
50+
51+
@Override
52+
public Void call() throws Exception {
53+
try {
54+
runner = Thread.currentThread();
55+
try {
56+
task.run();
57+
setRest(executor.submit(this));
58+
} catch (Throwable ex) {
59+
RxJavaPlugins.onError(ex);
60+
}
61+
} finally {
62+
runner = null;
63+
}
64+
return null;
65+
}
66+
67+
@Override
68+
public void dispose() {
69+
Future<?> current = first.getAndSet(CANCELLED);
70+
if (current != null && current != CANCELLED) {
71+
current.cancel(runner != Thread.currentThread());
72+
}
73+
current = rest.getAndSet(CANCELLED);
74+
if (current != null && current != CANCELLED) {
75+
current.cancel(runner != Thread.currentThread());
76+
}
77+
}
78+
79+
@Override
80+
public boolean isDisposed() {
81+
return first.get() == CANCELLED;
82+
}
83+
84+
void setFirst(Future<?> f) {
85+
for (;;) {
86+
Future<?> current = first.get();
87+
if (current == CANCELLED) {
88+
f.cancel(runner != Thread.currentThread());
89+
}
90+
if (first.compareAndSet(current, f)) {
91+
return;
92+
}
93+
}
94+
}
95+
96+
void setRest(Future<?> f) {
97+
for (;;) {
98+
Future<?> current = rest.get();
99+
if (current == CANCELLED) {
100+
f.cancel(runner != Thread.currentThread());
101+
}
102+
if (rest.compareAndSet(current, f)) {
103+
return;
104+
}
105+
}
106+
}
107+
}

src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,27 @@ public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit un
8585
* @param unit the time unit for both the initialDelay and period
8686
* @return the ScheduledRunnable instance
8787
*/
88-
public Disposable schedulePeriodicallyDirect(final Runnable run, long initialDelay, long period, TimeUnit unit) {
89-
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run));
88+
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
89+
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
90+
if (period <= 0L) {
91+
92+
InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
93+
try {
94+
Future<?> f;
95+
if (initialDelay <= 0L) {
96+
f = executor.submit(periodicWrapper);
97+
} else {
98+
f = executor.schedule(periodicWrapper, initialDelay, unit);
99+
}
100+
periodicWrapper.setFirst(f);
101+
} catch (RejectedExecutionException ex) {
102+
RxJavaPlugins.onError(ex);
103+
return EmptyDisposable.INSTANCE;
104+
}
105+
106+
return periodicWrapper;
107+
}
108+
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
90109
try {
91110
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
92111
task.setFuture(f);

src/main/java/io/reactivex/internal/schedulers/SingleScheduler.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,28 @@ public Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit uni
125125
@NonNull
126126
@Override
127127
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit) {
128-
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(RxJavaPlugins.onSchedule(run));
128+
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
129+
if (period <= 0L) {
130+
131+
ScheduledExecutorService exec = executor.get();
132+
133+
InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, exec);
134+
Future<?> f;
135+
try {
136+
if (initialDelay <= 0L) {
137+
f = exec.submit(periodicWrapper);
138+
} else {
139+
f = exec.schedule(periodicWrapper, initialDelay, unit);
140+
}
141+
periodicWrapper.setFirst(f);
142+
} catch (RejectedExecutionException ex) {
143+
RxJavaPlugins.onError(ex);
144+
return EmptyDisposable.INSTANCE;
145+
}
146+
147+
return periodicWrapper;
148+
}
149+
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
129150
try {
130151
Future<?> f = executor.get().scheduleAtFixedRate(task, initialDelay, period, unit);
131152
task.setFuture(f);

src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.*;
2323
import io.reactivex.Scheduler.Worker;
2424
import io.reactivex.disposables.*;
25+
import io.reactivex.internal.disposables.SequentialDisposable;
2526
import io.reactivex.internal.functions.Functions;
2627
import io.reactivex.internal.schedulers.SingleScheduler.ScheduledWorker;
2728
import io.reactivex.schedulers.Schedulers;
@@ -117,4 +118,64 @@ public void runnableDisposedAsyncTimed() throws Exception {
117118
Thread.sleep(1);
118119
}
119120
}
121+
122+
@Test(timeout = 10000)
123+
public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
124+
Scheduler s = Schedulers.single();
125+
126+
for (int initial = 0; initial < 2; initial++) {
127+
final CountDownLatch cdl = new CountDownLatch(1);
128+
129+
final SequentialDisposable sd = new SequentialDisposable();
130+
131+
try {
132+
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
133+
int count;
134+
@Override
135+
public void run() {
136+
if (++count == 10) {
137+
sd.dispose();
138+
cdl.countDown();
139+
}
140+
}
141+
}, initial, 0, TimeUnit.MILLISECONDS));
142+
143+
assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
144+
} finally {
145+
sd.dispose();
146+
}
147+
}
148+
}
149+
150+
@Test(timeout = 10000)
151+
public void schedulePeriodicallyZeroPeriod() throws Exception {
152+
Scheduler s = Schedulers.single();
153+
154+
for (int initial = 0; initial < 2; initial++) {
155+
156+
final CountDownLatch cdl = new CountDownLatch(1);
157+
158+
final SequentialDisposable sd = new SequentialDisposable();
159+
160+
Scheduler.Worker w = s.createWorker();
161+
162+
try {
163+
sd.replace(w.schedulePeriodically(new Runnable() {
164+
int count;
165+
@Override
166+
public void run() {
167+
if (++count == 10) {
168+
sd.dispose();
169+
cdl.countDown();
170+
}
171+
}
172+
}, initial, 0, TimeUnit.MILLISECONDS));
173+
174+
assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
175+
} finally {
176+
sd.dispose();
177+
w.dispose();
178+
}
179+
}
180+
}
120181
}

src/test/java/io/reactivex/schedulers/AbstractSchedulerTests.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import io.reactivex.*;
3030
import io.reactivex.disposables.Disposable;
3131
import io.reactivex.functions.*;
32+
import io.reactivex.internal.disposables.SequentialDisposable;
3233
import io.reactivex.internal.schedulers.TrampolineScheduler;
3334
import io.reactivex.internal.subscriptions.*;
3435
import io.reactivex.subscribers.DefaultSubscriber;
@@ -558,4 +559,71 @@ public void run() {
558559
}
559560
assertTrue(d.isDisposed());
560561
}
562+
563+
@Test(timeout = 10000)
564+
public void schedulePeriodicallyDirectZeroPeriod() throws Exception {
565+
Scheduler s = getScheduler();
566+
if (s instanceof TrampolineScheduler) {
567+
// can't properly stop a trampolined periodic task
568+
return;
569+
}
570+
571+
for (int initial = 0; initial < 2; initial++) {
572+
final CountDownLatch cdl = new CountDownLatch(1);
573+
574+
final SequentialDisposable sd = new SequentialDisposable();
575+
576+
try {
577+
sd.replace(s.schedulePeriodicallyDirect(new Runnable() {
578+
int count;
579+
@Override
580+
public void run() {
581+
if (++count == 10) {
582+
sd.dispose();
583+
cdl.countDown();
584+
}
585+
}
586+
}, initial, 0, TimeUnit.MILLISECONDS));
587+
588+
assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
589+
} finally {
590+
sd.dispose();
591+
}
592+
}
593+
}
594+
595+
@Test(timeout = 10000)
596+
public void schedulePeriodicallyZeroPeriod() throws Exception {
597+
Scheduler s = getScheduler();
598+
if (s instanceof TrampolineScheduler) {
599+
// can't properly stop a trampolined periodic task
600+
return;
601+
}
602+
603+
for (int initial = 0; initial < 2; initial++) {
604+
final CountDownLatch cdl = new CountDownLatch(1);
605+
606+
final SequentialDisposable sd = new SequentialDisposable();
607+
608+
Scheduler.Worker w = s.createWorker();
609+
610+
try {
611+
sd.replace(w.schedulePeriodically(new Runnable() {
612+
int count;
613+
@Override
614+
public void run() {
615+
if (++count == 10) {
616+
sd.dispose();
617+
cdl.countDown();
618+
}
619+
}
620+
}, initial, 0, TimeUnit.MILLISECONDS));
621+
622+
assertTrue("" + initial, cdl.await(5, TimeUnit.SECONDS));
623+
} finally {
624+
sd.dispose();
625+
w.dispose();
626+
}
627+
}
628+
}
561629
}

0 commit comments

Comments
 (0)