Skip to content

Commit a92a077

Browse files
committed
Merge pull request #3760 from akarnokd/ErrorThrownIssue1685Again2
1.x: fix ExecutorScheduler and GenericScheduledExecutorService reorder bug
2 parents 00fdfaf + c36456a commit a92a077

File tree

3 files changed

+91
-25
lines changed

3 files changed

+91
-25
lines changed

src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,24 +31,29 @@
3131
* the work asynchronously on the appropriate {@link Scheduler} implementation. This means for example that you would not use this approach
3232
* along with {@link TrampolineScheduler} or {@link ImmediateScheduler}.
3333
*/
34-
public final class GenericScheduledExecutorService implements SchedulerLifecycle{
34+
public final class GenericScheduledExecutorService implements SchedulerLifecycle {
3535

3636
private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
3737
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
3838

39-
private static final ScheduledExecutorService NONE;
39+
private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0];
40+
41+
private static final ScheduledExecutorService SHUTDOWN;
4042
static {
41-
NONE = Executors.newScheduledThreadPool(0);
42-
NONE.shutdownNow();
43+
SHUTDOWN = Executors.newScheduledThreadPool(0);
44+
SHUTDOWN.shutdown();
4345
}
4446

4547
/* Schedulers needs acces to this in order to work with the lifecycle. */
4648
public final static GenericScheduledExecutorService INSTANCE = new GenericScheduledExecutorService();
4749

48-
private final AtomicReference<ScheduledExecutorService> executor;
50+
private final AtomicReference<ScheduledExecutorService[]> executor;
4951

52+
/** We don't use atomics with this because thread-assignment is random anyway. */
53+
private static int roundRobin;
54+
5055
private GenericScheduledExecutorService() {
51-
executor = new AtomicReference<ScheduledExecutorService>(NONE);
56+
executor = new AtomicReference<ScheduledExecutorService[]>(NONE);
5257
start();
5358
}
5459

@@ -63,39 +68,60 @@ public void start() {
6368
count = 8;
6469
}
6570

66-
ScheduledExecutorService exec = Executors.newScheduledThreadPool(count, THREAD_FACTORY);
67-
if (executor.compareAndSet(NONE, exec)) {
68-
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
69-
if (exec instanceof ScheduledThreadPoolExecutor) {
70-
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
71+
// A multi-threaded executor can reorder tasks, having a set of them
72+
// and handing one of those out on getInstance() ensures a proper order
73+
74+
ScheduledExecutorService[] execs = new ScheduledExecutorService[count];
75+
for (int i = 0; i < count; i++) {
76+
execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
77+
}
78+
if (executor.compareAndSet(NONE, execs)) {
79+
for (ScheduledExecutorService exec : execs) {
80+
if (!NewThreadWorker.tryEnableCancelPolicy(exec)) {
81+
if (exec instanceof ScheduledThreadPoolExecutor) {
82+
NewThreadWorker.registerExecutor((ScheduledThreadPoolExecutor)exec);
83+
}
7184
}
7285
}
7386
} else {
74-
exec.shutdownNow();
87+
for (ScheduledExecutorService exec : execs) {
88+
exec.shutdownNow();
89+
}
7590
}
7691
}
7792

7893
@Override
7994
public void shutdown() {
8095
for (;;) {
81-
ScheduledExecutorService exec = executor.get();
82-
if (exec == NONE) {
96+
ScheduledExecutorService[] execs = executor.get();
97+
if (execs == NONE) {
8398
return;
8499
}
85-
if (executor.compareAndSet(exec, NONE)) {
86-
NewThreadWorker.deregisterExecutor(exec);
87-
exec.shutdownNow();
100+
if (executor.compareAndSet(execs, NONE)) {
101+
for (ScheduledExecutorService exec : execs) {
102+
NewThreadWorker.deregisterExecutor(exec);
103+
exec.shutdownNow();
104+
}
88105
return;
89106
}
90107
}
91108
}
92109

93110
/**
94-
* See class Javadoc for information on what this is for and how to use.
111+
* Returns one of the single-threaded ScheduledExecutorService helper executors.
95112
*
96113
* @return {@link ScheduledExecutorService} for generic use.
97114
*/
98115
public static ScheduledExecutorService getInstance() {
99-
return INSTANCE.executor.get();
116+
ScheduledExecutorService[] execs = INSTANCE.executor.get();
117+
if (execs == NONE) {
118+
return SHUTDOWN;
119+
}
120+
int r = roundRobin + 1;
121+
if (r >= execs.length) {
122+
r = 0;
123+
}
124+
roundRobin = r;
125+
return execs[r];
100126
}
101127
}

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,14 @@ static final class ExecutorSchedulerWorker extends Scheduler.Worker implements R
5454
final ConcurrentLinkedQueue<ScheduledAction> queue;
5555
final AtomicInteger wip;
5656

57+
final ScheduledExecutorService service;
58+
5759
public ExecutorSchedulerWorker(Executor executor) {
5860
this.executor = executor;
5961
this.queue = new ConcurrentLinkedQueue<ScheduledAction>();
6062
this.wip = new AtomicInteger();
6163
this.tasks = new CompositeSubscription();
64+
this.service = GenericScheduledExecutorService.getInstance();
6265
}
6366

6467
@Override
@@ -108,12 +111,6 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
108111
if (isUnsubscribed()) {
109112
return Subscriptions.unsubscribed();
110113
}
111-
ScheduledExecutorService service;
112-
if (executor instanceof ScheduledExecutorService) {
113-
service = (ScheduledExecutorService)executor;
114-
} else {
115-
service = GenericScheduledExecutorService.getInstance();
116-
}
117114

118115
final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
119116
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package rx.internal.schedulers;
2+
3+
import java.util.concurrent.*;
4+
import java.util.concurrent.atomic.AtomicInteger;
5+
6+
import org.junit.*;
7+
8+
public class GenericScheduledExecutorServiceTest {
9+
@Test
10+
public void verifyInstanceIsSingleThreaded() throws Exception {
11+
ScheduledExecutorService exec = GenericScheduledExecutorService.getInstance();
12+
13+
final AtomicInteger state = new AtomicInteger();
14+
15+
final AtomicInteger found1 = new AtomicInteger();
16+
final AtomicInteger found2 = new AtomicInteger();
17+
18+
Future<?> f1 = exec.schedule(new Runnable() {
19+
@Override
20+
public void run() {
21+
try {
22+
Thread.sleep(250);
23+
} catch (InterruptedException e) {
24+
e.printStackTrace();
25+
}
26+
found1.set(state.getAndSet(1));
27+
}
28+
}, 250, TimeUnit.MILLISECONDS);
29+
Future<?> f2 = exec.schedule(new Runnable() {
30+
@Override
31+
public void run() {
32+
found2.set(state.getAndSet(2));
33+
}
34+
}, 250, TimeUnit.MILLISECONDS);
35+
36+
f1.get();
37+
f2.get();
38+
39+
Assert.assertEquals(2, state.get());
40+
Assert.assertEquals(0, found1.get());
41+
Assert.assertEquals(1, found2.get());
42+
}
43+
}

0 commit comments

Comments
 (0)