Skip to content

Commit 99abbf9

Browse files
csabakosakarnokd
authored andcommitted
CachedThreadScheduler should wait until the previous action (if any) … (#4231)
* CachedThreadScheduler should wait until the previous action (if any) completes before releasing a worker to the pool. Fixes #4230. * Modified EventLoopWorker to implement Action0 and schedule itself to release. This saves an allocation.
1 parent cbec342 commit 99abbf9

File tree

2 files changed

+78
-2
lines changed

2 files changed

+78
-2
lines changed

src/main/java/rx/internal/schedulers/CachedThreadScheduler.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public Worker createWorker() {
174174
return new EventLoopWorker(pool.get());
175175
}
176176

177-
static final class EventLoopWorker extends Scheduler.Worker {
177+
static final class EventLoopWorker extends Scheduler.Worker implements Action0 {
178178
private final CompositeSubscription innerSubscription = new CompositeSubscription();
179179
private final CachedWorkerPool pool;
180180
private final ThreadWorker threadWorker;
@@ -190,11 +190,18 @@ static final class EventLoopWorker extends Scheduler.Worker {
190190
public void unsubscribe() {
191191
if (once.compareAndSet(false, true)) {
192192
// unsubscribe should be idempotent, so only do this once
193-
pool.release(threadWorker);
193+
194+
// Release the worker _after_ the previous action (if any) has completed
195+
threadWorker.schedule(this);
194196
}
195197
innerSubscription.unsubscribe();
196198
}
197199

200+
@Override
201+
public void call() {
202+
pool.release(threadWorker);
203+
}
204+
198205
@Override
199206
public boolean isUnsubscribed() {
200207
return innerSubscription.isUnsubscribed();

src/test/java/rx/schedulers/IoSchedulerTest.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import static org.junit.Assert.assertTrue;
2020

21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
2123
import org.junit.Test;
2224

2325
import rx.*;
@@ -83,4 +85,71 @@ public void testCancelledTaskRetention() throws InterruptedException {
8385
}
8486
}
8587

88+
// Tests that an uninterruptible worker does not get reused
89+
@Test(timeout = 10000)
90+
public void testUninterruptibleActionDoesNotBlockOtherAction() throws InterruptedException {
91+
final Worker uninterruptibleWorker = Schedulers.io().createWorker();
92+
final AtomicBoolean running = new AtomicBoolean(false);
93+
final AtomicBoolean shouldQuit = new AtomicBoolean(false);
94+
try {
95+
uninterruptibleWorker.schedule(new Action0() {
96+
@Override
97+
public void call() {
98+
synchronized (running) {
99+
running.set(true);
100+
running.notifyAll();
101+
}
102+
synchronized (shouldQuit) {
103+
while (!shouldQuit.get()) {
104+
try {
105+
shouldQuit.wait();
106+
} catch (final InterruptedException ignored) {
107+
}
108+
}
109+
}
110+
synchronized (running) {
111+
running.set(false);
112+
running.notifyAll();
113+
}
114+
}
115+
});
116+
117+
// Wait for the action to start executing
118+
synchronized (running) {
119+
while (!running.get()) {
120+
running.wait();
121+
}
122+
}
123+
} finally {
124+
uninterruptibleWorker.unsubscribe();
125+
}
126+
127+
final Worker otherWorker = Schedulers.io().createWorker();
128+
final AtomicBoolean otherActionRan = new AtomicBoolean(false);
129+
try {
130+
otherWorker.schedule(new Action0() {
131+
@Override
132+
public void call() {
133+
otherActionRan.set(true);
134+
}
135+
});
136+
Thread.sleep(1000); // give the action a chance to run
137+
} finally {
138+
otherWorker.unsubscribe();
139+
}
140+
141+
assertTrue(running.get()); // uninterruptible action keeps on running since InterruptedException is swallowed
142+
assertTrue(otherActionRan.get());
143+
144+
// Wait for uninterruptibleWorker to exit (to clean up after ourselves)
145+
synchronized (shouldQuit) {
146+
shouldQuit.set(true);
147+
shouldQuit.notifyAll();
148+
}
149+
synchronized (running) {
150+
while (running.get()) {
151+
running.wait();
152+
}
153+
}
154+
}
86155
}

0 commit comments

Comments
 (0)