Skip to content

Commit e1d50e4

Browse files
Merge pull request #2907 from akarnokd/SchedulerFromFix
Fixed schedule race and task retention with ExecutorScheduler.
2 parents 47b098c + 491e615 commit e1d50e4

File tree

2 files changed

+223
-75
lines changed

2 files changed

+223
-75
lines changed

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 55 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,14 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.Executor;
20-
import java.util.concurrent.Future;
21-
import java.util.concurrent.RejectedExecutionException;
22-
import java.util.concurrent.ScheduledExecutorService;
23-
import java.util.concurrent.TimeUnit;
18+
import java.util.concurrent.*;
2419
import java.util.concurrent.atomic.AtomicInteger;
25-
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
26-
import rx.Scheduler;
27-
import rx.Subscription;
20+
21+
import rx.*;
2822
import rx.functions.Action0;
23+
import rx.internal.schedulers.ScheduledAction;
2924
import rx.plugins.RxJavaPlugins;
30-
import rx.subscriptions.CompositeSubscription;
31-
import rx.subscriptions.MultipleAssignmentSubscription;
32-
import rx.subscriptions.Subscriptions;
25+
import rx.subscriptions.*;
3326

3427
/**
3528
* Scheduler that wraps an Executor instance and establishes the Scheduler contract upon it.
@@ -58,12 +51,12 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
5851
// TODO: use a better performing structure for task tracking
5952
final CompositeSubscription tasks;
6053
// TODO: use MpscLinkedQueue once available
61-
final ConcurrentLinkedQueue<ExecutorAction> queue;
54+
final ConcurrentLinkedQueue<ScheduledAction> queue;
6255
final AtomicInteger wip;
6356

6457
public ExecutorSchedulerWorker(Executor executor) {
6558
this.executor = executor;
66-
this.queue = new ConcurrentLinkedQueue<ExecutorAction>();
59+
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
6760
this.wip = new AtomicInteger();
6861
this.tasks = new CompositeSubscription();
6962
}
@@ -73,11 +66,15 @@ public Subscription schedule(Action0 action) {
7366
if (isUnsubscribed()) {
7467
return Subscriptions.unsubscribed();
7568
}
76-
ExecutorAction ea = new ExecutorAction(action, tasks);
69+
ScheduledAction ea = new ScheduledAction(action, tasks);
7770
tasks.add(ea);
7871
queue.offer(ea);
7972
if (wip.getAndIncrement() == 0) {
8073
try {
74+
// note that since we schedule the emission of potentially multiple tasks
75+
// there is no clear way to cancel this schedule from individual tasks
76+
// so even if executor is an ExecutorService, we can't associate the future
77+
// returned by submit() with any particular ScheduledAction
8178
executor.execute(this);
8279
} catch (RejectedExecutionException t) {
8380
// cleanup if rejected
@@ -96,7 +93,10 @@ public Subscription schedule(Action0 action) {
9693
@Override
9794
public void run() {
9895
do {
99-
queue.poll().run();
96+
ScheduledAction sa = queue.poll();
97+
if (!sa.isUnsubscribed()) {
98+
sa.run();
99+
}
100100
} while (wip.decrementAndGet() > 0);
101101
}
102102

@@ -115,28 +115,54 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
115115
service = GenericScheduledExecutorService.getInstance();
116116
}
117117

118+
final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
118119
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
119-
// tasks.add(mas); // Needs a removal without unsubscription
120+
mas.set(first);
121+
tasks.add(mas);
122+
final Subscription removeMas = Subscriptions.create(new Action0() {
123+
@Override
124+
public void call() {
125+
tasks.remove(mas);
126+
}
127+
});
120128

121-
try {
122-
Future<?> f = service.schedule(new Runnable() {
123-
@Override
124-
public void run() {
125-
if (mas.isUnsubscribed()) {
126-
return;
127-
}
128-
mas.set(schedule(action));
129-
// tasks.delete(mas); // Needs a removal without unsubscription
129+
ScheduledAction ea = new ScheduledAction(new Action0() {
130+
@Override
131+
public void call() {
132+
if (mas.isUnsubscribed()) {
133+
return;
130134
}
131-
}, delayTime, unit);
132-
mas.set(Subscriptions.from(f));
135+
// schedule the real action untimed
136+
Subscription s2 = schedule(action);
137+
mas.set(s2);
138+
// unless the worker is unsubscribed, we should get a new ScheduledAction
139+
if (s2.getClass() == ScheduledAction.class) {
140+
// when this ScheduledAction completes, we need to remove the
141+
// MAS referencing the whole setup to avoid leaks
142+
((ScheduledAction)s2).add(removeMas);
143+
}
144+
}
145+
});
146+
// This will make sure if ea.call() gets executed before this line
147+
// we don't override the current task in mas.
148+
first.set(ea);
149+
// we don't need to add ea to tasks because it will be tracked through mas/first
150+
151+
152+
try {
153+
Future<?> f = service.schedule(ea, delayTime, unit);
154+
ea.add(f);
133155
} catch (RejectedExecutionException t) {
134156
// report the rejection to plugins
135157
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
136158
throw t;
137159
}
138160

139-
return mas;
161+
/*
162+
* This allows cancelling either the delayed schedule or the actual schedule referenced
163+
* by mas and makes sure mas is removed from the tasks composite to avoid leaks.
164+
*/
165+
return removeMas;
140166
}
141167

142168
@Override
@@ -150,46 +176,4 @@ public void unsubscribe() {
150176
}
151177

152178
}
153-
154-
/** Runs the actual action and maintains an unsubscription state. */
155-
static final class ExecutorAction implements Runnable, Subscription {
156-
final Action0 actual;
157-
final CompositeSubscription parent;
158-
volatile int unsubscribed;
159-
static final AtomicIntegerFieldUpdater<ExecutorAction> UNSUBSCRIBED_UPDATER
160-
= AtomicIntegerFieldUpdater.newUpdater(ExecutorAction.class, "unsubscribed");
161-
162-
public ExecutorAction(Action0 actual, CompositeSubscription parent) {
163-
this.actual = actual;
164-
this.parent = parent;
165-
}
166-
167-
@Override
168-
public void run() {
169-
if (isUnsubscribed()) {
170-
return;
171-
}
172-
try {
173-
actual.call();
174-
} catch (Throwable t) {
175-
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
176-
Thread thread = Thread.currentThread();
177-
thread.getUncaughtExceptionHandler().uncaughtException(thread, t);
178-
} finally {
179-
unsubscribe();
180-
}
181-
}
182-
@Override
183-
public boolean isUnsubscribed() {
184-
return unsubscribed != 0;
185-
}
186-
187-
@Override
188-
public void unsubscribe() {
189-
if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
190-
parent.remove(this);
191-
}
192-
}
193-
194-
}
195179
}

src/test/java/rx/schedulers/ExecutorSchedulerTest.java

Lines changed: 168 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,20 @@
1515
*/
1616
package rx.schedulers;
1717

18+
import static org.junit.Assert.*;
19+
20+
import java.lang.management.*;
21+
import java.util.concurrent.*;
22+
import java.util.concurrent.atomic.AtomicInteger;
23+
1824
import org.junit.Test;
19-
import rx.Scheduler;
20-
import rx.internal.util.RxThreadFactory;
2125

22-
import java.util.concurrent.Executor;
23-
import java.util.concurrent.Executors;
26+
import rx.*;
27+
import rx.Scheduler.Worker;
28+
import rx.functions.*;
29+
import rx.internal.schedulers.NewThreadWorker;
30+
import rx.internal.util.RxThreadFactory;
31+
import rx.schedulers.ExecutorScheduler.ExecutorSchedulerWorker;
2432

2533
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
2634

@@ -40,4 +48,160 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
4048
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
4149
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
4250
}
51+
@Test(timeout = 30000)
52+
public void testCancelledTaskRetention() throws InterruptedException {
53+
System.out.println("Wait before GC");
54+
Thread.sleep(1000);
55+
56+
System.out.println("GC");
57+
System.gc();
58+
59+
Thread.sleep(1000);
60+
61+
62+
MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
63+
MemoryUsage memHeap = memoryMXBean.getHeapMemoryUsage();
64+
long initial = memHeap.getUsed();
65+
66+
System.out.printf("Starting: %.3f MB%n", initial / 1024.0 / 1024.0);
67+
68+
Scheduler.Worker w = Schedulers.io().createWorker();
69+
for (int i = 0; i < 500000; i++) {
70+
if (i % 50000 == 0) {
71+
System.out.println(" -> still scheduling: " + i);
72+
}
73+
w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
74+
}
75+
76+
memHeap = memoryMXBean.getHeapMemoryUsage();
77+
long after = memHeap.getUsed();
78+
System.out.printf("Peak: %.3f MB%n", after / 1024.0 / 1024.0);
79+
80+
w.unsubscribe();
81+
82+
System.out.println("Wait before second GC");
83+
Thread.sleep(NewThreadWorker.PURGE_FREQUENCY + 2000);
84+
85+
System.out.println("Second GC");
86+
System.gc();
87+
88+
Thread.sleep(1000);
89+
90+
memHeap = memoryMXBean.getHeapMemoryUsage();
91+
long finish = memHeap.getUsed();
92+
System.out.printf("After: %.3f MB%n", finish / 1024.0 / 1024.0);
93+
94+
if (finish > initial * 5) {
95+
fail(String.format("Tasks retained: %.3f -> %.3f -> %.3f", initial / 1024 / 1024.0, after / 1024 / 1024.0, finish / 1024 / 1024d));
96+
}
97+
}
98+
99+
/** A simple executor which queues tasks and executes them one-by-one if executeOne() is called. */
100+
static final class TestExecutor implements Executor {
101+
final ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<Runnable>();
102+
@Override
103+
public void execute(Runnable command) {
104+
queue.offer(command);
105+
}
106+
public void executeOne() {
107+
Runnable r = queue.poll();
108+
if (r != null) {
109+
r.run();
110+
}
111+
}
112+
public void executeAll() {
113+
Runnable r;
114+
while ((r = queue.poll()) != null) {
115+
r.run();
116+
}
117+
}
118+
}
119+
120+
@Test
121+
public void testCancelledTasksDontRun() {
122+
final AtomicInteger calls = new AtomicInteger();
123+
Action0 task = new Action0() {
124+
@Override
125+
public void call() {
126+
calls.getAndIncrement();
127+
}
128+
};
129+
TestExecutor exec = new TestExecutor();
130+
Scheduler custom = Schedulers.from(exec);
131+
Worker w = custom.createWorker();
132+
try {
133+
Subscription s1 = w.schedule(task);
134+
Subscription s2 = w.schedule(task);
135+
Subscription s3 = w.schedule(task);
136+
137+
s1.unsubscribe();
138+
s2.unsubscribe();
139+
s3.unsubscribe();
140+
141+
exec.executeAll();
142+
143+
assertEquals(0, calls.get());
144+
} finally {
145+
w.unsubscribe();
146+
}
147+
}
148+
@Test
149+
public void testCancelledWorkerDoesntRunTasks() {
150+
final AtomicInteger calls = new AtomicInteger();
151+
Action0 task = new Action0() {
152+
@Override
153+
public void call() {
154+
calls.getAndIncrement();
155+
}
156+
};
157+
TestExecutor exec = new TestExecutor();
158+
Scheduler custom = Schedulers.from(exec);
159+
Worker w = custom.createWorker();
160+
try {
161+
w.schedule(task);
162+
w.schedule(task);
163+
w.schedule(task);
164+
} finally {
165+
w.unsubscribe();
166+
}
167+
exec.executeAll();
168+
assertEquals(0, calls.get());
169+
}
170+
@Test
171+
public void testNoTimedTaskAfterScheduleRetention() throws InterruptedException {
172+
Executor e = new Executor() {
173+
@Override
174+
public void execute(Runnable command) {
175+
command.run();
176+
}
177+
};
178+
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();
179+
180+
w.schedule(Actions.empty(), 1, TimeUnit.MILLISECONDS);
181+
182+
assertTrue(w.tasks.hasSubscriptions());
183+
184+
Thread.sleep(100);
185+
186+
assertFalse(w.tasks.hasSubscriptions());
187+
}
188+
189+
@Test
190+
public void testNoTimedTaskPartRetention() {
191+
Executor e = new Executor() {
192+
@Override
193+
public void execute(Runnable command) {
194+
195+
}
196+
};
197+
ExecutorSchedulerWorker w = (ExecutorSchedulerWorker)Schedulers.from(e).createWorker();
198+
199+
Subscription s = w.schedule(Actions.empty(), 1, TimeUnit.DAYS);
200+
201+
assertTrue(w.tasks.hasSubscriptions());
202+
203+
s.unsubscribe();
204+
205+
assertFalse(w.tasks.hasSubscriptions());
206+
}
43207
}

0 commit comments

Comments
 (0)