Skip to content

Commit 2a8d6c7

Browse files
committed
1.x: fix RxRingBuffer-pool depending on the computation scheduler (#3924)
1 parent 1512c10 commit 2a8d6c7

File tree

1 file changed

+41
-31
lines changed

1 file changed

+41
-31
lines changed

src/main/java/rx/internal/util/ObjectPool.java

Lines changed: 41 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,16 @@
2121
import java.util.concurrent.*;
2222
import java.util.concurrent.atomic.AtomicReference;
2323

24-
import rx.Scheduler.Worker;
25-
import rx.functions.Action0;
26-
import rx.internal.schedulers.SchedulerLifecycle;
24+
import rx.internal.schedulers.*;
2725
import rx.internal.util.unsafe.*;
28-
import rx.schedulers.Schedulers;
2926

3027
public abstract class ObjectPool<T> implements SchedulerLifecycle {
3128
Queue<T> pool;
3229
final int minSize;
3330
final int maxSize;
3431
private final long validationInterval;
3532

36-
private final AtomicReference<Worker> schedulerWorker;
33+
private final AtomicReference<Future<?>> periodicTask;
3734

3835
public ObjectPool() {
3936
this(0, 0, 67);
@@ -55,7 +52,7 @@ private ObjectPool(final int min, final int max, final long validationInterval)
5552
this.minSize = min;
5653
this.maxSize = max;
5754
this.validationInterval = validationInterval;
58-
this.schedulerWorker = new AtomicReference<Worker>();
55+
this.periodicTask = new AtomicReference<Future<?>>();
5956
// initialize pool
6057
initialize(min);
6158

@@ -96,38 +93,51 @@ public void returnObject(T object) {
9693
*/
9794
@Override
9895
public void shutdown() {
99-
Worker w = schedulerWorker.getAndSet(null);
100-
if (w != null) {
101-
w.unsubscribe();
96+
Future<?> f = periodicTask.getAndSet(null);
97+
if (f != null) {
98+
f.cancel(false);
10299
}
103100
}
104101

105102
@Override
106103
public void start() {
107-
Worker w = Schedulers.computation().createWorker();
108-
if (schedulerWorker.compareAndSet(null, w)) {
109-
w.schedulePeriodically(new Action0() {
110-
111-
@Override
112-
public void call() {
113-
int size = pool.size();
114-
if (size < minSize) {
115-
int sizeToBeAdded = maxSize - size;
116-
for (int i = 0; i < sizeToBeAdded; i++) {
117-
pool.add(createObject());
118-
}
119-
} else if (size > maxSize) {
120-
int sizeToBeRemoved = size - maxSize;
121-
for (int i = 0; i < sizeToBeRemoved; i++) {
122-
// pool.pollLast();
123-
pool.poll();
104+
for (;;) {
105+
if (periodicTask.get() != null) {
106+
return;
107+
}
108+
ScheduledExecutorService w = GenericScheduledExecutorService.getInstance();
109+
110+
Future<?> f;
111+
try {
112+
f = w.scheduleAtFixedRate(new Runnable() {
113+
114+
@Override
115+
public void run() {
116+
int size = pool.size();
117+
if (size < minSize) {
118+
int sizeToBeAdded = maxSize - size;
119+
for (int i = 0; i < sizeToBeAdded; i++) {
120+
pool.add(createObject());
121+
}
122+
} else if (size > maxSize) {
123+
int sizeToBeRemoved = size - maxSize;
124+
for (int i = 0; i < sizeToBeRemoved; i++) {
125+
// pool.pollLast();
126+
pool.poll();
127+
}
124128
}
125129
}
126-
}
127-
128-
}, validationInterval, validationInterval, TimeUnit.SECONDS);
129-
} else {
130-
w.unsubscribe();
130+
131+
}, validationInterval, validationInterval, TimeUnit.SECONDS);
132+
} catch (RejectedExecutionException ex) {
133+
RxJavaPluginUtils.handleException(ex);
134+
break;
135+
}
136+
if (!periodicTask.compareAndSet(null, f)) {
137+
f.cancel(false);
138+
} else {
139+
break;
140+
}
131141
}
132142
}
133143

0 commit comments

Comments
 (0)