Skip to content

Commit 3b8d393

Browse files
committed
Fixed schedule race and task retention with ExecutorScheduler.
1 parent 51e03cc commit 3b8d393

File tree

2 files changed

+163
-74
lines changed

2 files changed

+163
-74
lines changed

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 33 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,14 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.Executor;
20-
import java.util.concurrent.Future;
21-
import java.util.concurrent.RejectedExecutionException;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
2419
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26-
import rx.Scheduler;
27-
import rx.Subscription;
20+
21+
import rx.*;
2822
import rx.functions.Action0;
23+
import rx.internal.schedulers.ScheduledAction;
2924
import rx.plugins.RxJavaPlugins;
30-
import rx.subscriptions.CompositeSubscription;
31-
import rx.subscriptions.MultipleAssignmentSubscription;
32-
import rx.subscriptions.Subscriptions;
25+
import rx.subscriptions.*;
3326

3427
/**
3528
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
@@ -58,12 +51,12 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
5851
// TODO: use a better performing structure for task tracking
5952
final CompositeSubscription tasks;
6053
// TODO: use MpscLinkedQueue once available
61-
final ConcurrentLinkedQueue<ExecutorAction> queue;
54+
final ConcurrentLinkedQueue<ScheduledAction> queue;
6255
final AtomicInteger wip;
6356

6457
public ExecutorSchedulerWorker(Executor executor) {
6558
this.executor = executor;
66-
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
59+
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
6760
this.wip = new AtomicInteger();
6861
this.tasks = new CompositeSubscription();
6962
}
@@ -73,11 +66,15 @@ public Subscription schedule(Action0 action) {
7366
if (isUnsubscribed()) {
7467
return Subscriptions.unsubscribed();
7568
}
76-
ExecutorAction ea = new ExecutorAction(action, tasks);
69+
ScheduledAction ea = new ScheduledAction(action, tasks);
7770
tasks.add(ea);
7871
queue.offer(ea);
7972
if (wip.getAndIncrement() == 0) {
8073
try {
74+
// note that since we schedule the emission of potentially multiple tasks
75+
// there is no clear way to cancel this schedule from individual tasks
76+
// so even if executor is an ExecutorService, we can't associate the future
77+
// returned by submit() with any particular ScheduledAction
8178
executor.execute(this);
8279
} catch (RejectedExecutionException t) {
8380
// cleanup if rejected
@@ -96,7 +93,10 @@ public Subscription schedule(Action0 action) {
9693
@Override
9794
public void run() {
9895
do {
99-
queue.poll().run();
96+
ScheduledAction sa = queue.poll();
97+
if (!sa.isUnsubscribed()) {
98+
sa.run();
99+
}
100100
} while (wip.decrementAndGet() > 0);
101101
}
102102

@@ -115,21 +115,26 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
115115
service = GenericScheduledExecutorService.getInstance();
116116
}
117117

118+
final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
118119
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
119-
// tasks.add(mas); // Needs a removal without unsubscription
120+
mas.set(first);
121+
tasks.add(mas); // Needs a removal without unsubscription
120122

121-
try {
122-
Future<?> f = service.schedule(new Runnable() {
123-
@Override
124-
public void run() {
125-
if (mas.isUnsubscribed()) {
126-
return;
127-
}
128-
mas.set(schedule(action));
129-
// tasks.delete(mas); // Needs a removal without unsubscription
123+
ScheduledAction ea = new ScheduledAction(new Action0() {
124+
@Override
125+
public void call() {
126+
if (mas.isUnsubscribed()) {
127+
return;
130128
}
131-
}, delayTime, unit);
132-
mas.set(Subscriptions.from(f));
129+
mas.set(schedule(action));
130+
}
131+
}, tasks);
132+
first.set(ea);
133+
tasks.add(ea);
134+
135+
try {
136+
Future<?> f = service.schedule(ea, delayTime, unit);
137+
ea.add(f);
133138
} catch (RejectedExecutionException t) {
134139
// report the rejection to plugins
135140
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
@@ -150,46 +155,4 @@ public void unsubscribe() {
150155
}
151156

152157
}
153-
154-
/** Runs the actual action and maintains an unsubscription state. */
155-
static final class ExecutorAction implements Runnable, Subscription {
156-
final Action0 actual;
157-
final CompositeSubscription parent;
158-
volatile int unsubscribed;
159-
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
160-
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
161-
162-
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
163-
this.actual = actual;
164-
this.parent = parent;
165-
}
166-
167-
@Override
168-
public void run() {
169-
if (isUnsubscribed()) {
170-
return;
171-
}
172-
try {
173-
actual.call();
174-
} catch (Throwable t) {
175-
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
176-
Thread thread = Thread.currentThread();
177-
thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
178-
} finally {
179-
unsubscribe();
180-
}
181-
}
182-
@Override
183-
public boolean isUnsubscribed() {
184-
return unsubscribed != 0;
185-
}
186-
187-
@Override
188-
public void unsubscribe() {
189-
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
190-
parent.remove(this);
191-
}
192-
}
193-
194-
}
195158
}

src/test/java/rx/schedulers/ExecutorSchedulerTest.java

Lines changed: 130 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,19 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import java.lang.management.*;
19+
import java.util.concurrent.*;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import static org.junit.Assert.*;
23+
1824
import org.junit.Test;
19-
import rx.Scheduler;
20-
import rx.internal.util.RxThreadFactory;
2125

22-
import java.util.concurrent.Executor;
23-
import java.util.concurrent.Executors;
26+
import rx.*;
27+
import rx.Scheduler.Worker;
28+
import rx.functions.*;
29+
import rx.internal.schedulers.NewThreadWorker;
30+
import rx.internal.util.RxThreadFactory;
2431

2532
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
2633

@@ -40,4 +47,123 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
4047
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
4148
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
4249
}
50+
@Test(timeout = 30000)
51+
public void testCancelledTaskRetention() throws InterruptedException {
52+
System.out.println("Wait before GC");
53+
Thread.sleep(1000);
54+
55+
System.out.println("GC");
56+
System.gc();
57+
58+
Thread.sleep(1000);
59+
60+
61+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
62+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
63+
long initial = memHeap.getUsed();
64+
65+
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
66+
67+
Scheduler.Worker w = Schedulers.io().createWorker();
68+
for (int i = 0; i < 500000; i++) {
69+
if (i % 50000 == 0) {
70+
System.out.println(" -> still scheduling: " + i);
71+
}
72+
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
73+
}
74+
75+
memHeap = memoryMXBean.getHeapMemoryUsage();
76+
long after = memHeap.getUsed();
77+
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
78+
79+
w.unsubscribe();
80+
81+
System.out.println("Wait before second GC");
82+
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);
83+
84+
System.out.println("Second GC");
85+
System.gc();
86+
87+
Thread.sleep(1000);
88+
89+
memHeap = memoryMXBean.getHeapMemoryUsage();
90+
long finish = memHeap.getUsed();
91+
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
92+
93+
if (finish > initial * 5) {
94+
fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
95+
}
96+
}
97+
98+
/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
99+
static final class TestExecutor implements Executor {
100+
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
101+
@Override
102+
public void execute(Runnable command) {
103+
queue.offer(command);
104+
}
105+
public void executeOne() {
106+
Runnable r = queue.poll();
107+
if (r != null) {
108+
r.run();
109+
}
110+
}
111+
public void executeAll() {
112+
Runnable r;
113+
while ((r = queue.poll()) != null) {
114+
r.run();
115+
}
116+
}
117+
}
118+
119+
@Test
120+
public void testCancelledTasksDontRun() {
121+
final AtomicInteger calls = new AtomicInteger();
122+
Action0 task = new Action0() {
123+
@Override
124+
public void call() {
125+
calls.getAndIncrement();
126+
}
127+
};
128+
TestExecutor exec = new TestExecutor();
129+
Scheduler custom = Schedulers.from(exec);
130+
Worker w = custom.createWorker();
131+
try {
132+
Subscription s1 = w.schedule(task);
133+
Subscription s2 = w.schedule(task);
134+
Subscription s3 = w.schedule(task);
135+
136+
s1.unsubscribe();
137+
s2.unsubscribe();
138+
s3.unsubscribe();
139+
140+
exec.executeAll();
141+
142+
assertEquals(0, calls.get());
143+
} finally {
144+
w.unsubscribe();
145+
}
146+
}
147+
@Test
148+
public void testCancelledWorkerDoesntRunTasks() {
149+
final AtomicInteger calls = new AtomicInteger();
150+
Action0 task = new Action0() {
151+
@Override
152+
public void call() {
153+
calls.getAndIncrement();
154+
}
155+
};
156+
TestExecutor exec = new TestExecutor();
157+
Scheduler custom = Schedulers.from(exec);
158+
Worker w = custom.createWorker();
159+
try {
160+
w.schedule(task);
161+
w.schedule(task);
162+
w.schedule(task);
163+
} finally {
164+
w.unsubscribe();
165+
}
166+
exec.executeAll();
167+
assertEquals(0, calls.get());
168+
}
43169
}

0 commit comments

Comments
 (0)