Skip to content

Commit a813457

Browse files
committed
1.x: allow customizing GenericScheduledExecutorService via RxJavaHooks
1 parent cec8915 commit a813457

File tree

5 files changed

+133
-5
lines changed

5 files changed

+133
-5
lines changed

src/main/java/rx/internal/schedulers/GenericScheduledExecutorService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.concurrent.atomic.AtomicReference;
2020

2121
import rx.Scheduler;
22-
import rx.internal.util.RxThreadFactory;
2322

2423
/**
2524
* A default {@link ScheduledExecutorService} that can be used for scheduling actions when a {@link Scheduler} implementation doesn't have that ability.
@@ -32,9 +31,6 @@
3231
*/
3332
public final class GenericScheduledExecutorService implements SchedulerLifecycle {
3433

35-
private static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
36-
private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
37-
3834
private static final ScheduledExecutorService[] NONE = new ScheduledExecutorService[0];
3935

4036
private static final ScheduledExecutorService SHUTDOWN;
@@ -72,7 +68,7 @@ public void start() {
7268

7369
ScheduledExecutorService[] execs = new ScheduledExecutorService[count];
7470
for (int i = 0; i < count; i++) {
75-
execs[i] = Executors.newScheduledThreadPool(1, THREAD_FACTORY);
71+
execs[i] = GenericScheduledExecutorServiceFactory.create();
7672
}
7773
if (executor.compareAndSet(NONE, execs)) {
7874
for (ScheduledExecutorService exec : execs) {
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package rx.internal.schedulers;
2+
3+
import java.util.concurrent.*;
4+
5+
import rx.functions.Func0;
6+
import rx.internal.util.RxThreadFactory;
7+
import rx.plugins.RxJavaHooks;
8+
9+
/**
10+
* Utility class to create the individual ScheduledExecutorService instances for
11+
* the GenericScheduledExecutorService class.
12+
*/
13+
enum GenericScheduledExecutorServiceFactory {
14+
;
15+
16+
static final String THREAD_NAME_PREFIX = "RxScheduledExecutorPool-";
17+
static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);
18+
19+
static ThreadFactory threadFactory() {
20+
return THREAD_FACTORY;
21+
}
22+
23+
/**
24+
* Creates a ScheduledExecutorService (either the default or given by a hook).
25+
* @return the SchuduledExecutorService created.
26+
*/
27+
public static ScheduledExecutorService create() {
28+
Func0<? extends ScheduledExecutorService> f = RxJavaHooks.getOnGenericScheduledExecutorService();
29+
if (f == null) {
30+
return createDefault();
31+
}
32+
return f.call();
33+
}
34+
35+
36+
static ScheduledExecutorService createDefault() {
37+
return Executors.newScheduledThreadPool(1, threadFactory());
38+
}
39+
}

src/main/java/rx/plugins/RxJavaHooks.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.plugins;
1717

1818
import java.lang.Thread.UncaughtExceptionHandler;
19+
import java.util.concurrent.ScheduledExecutorService;
1920

2021
import rx.*;
2122
import rx.Completable.CompletableOnSubscribe;
@@ -67,6 +68,8 @@ public final class RxJavaHooks {
6768
static volatile Func1<Subscription, Subscription> onObservableReturn;
6869

6970
static volatile Func1<Subscription, Subscription> onSingleReturn;
71+
72+
static volatile Func0<? extends ScheduledExecutorService> onGenericScheduledExecutorService;
7073

7174
/** Initialize with the default delegation to the original RxJavaPlugins. */
7275
static {
@@ -175,6 +178,7 @@ public static void reset() {
175178
onComputationScheduler = null;
176179
onIOScheduler = null;
177180
onNewThreadScheduler = null;
181+
onGenericScheduledExecutorService = null;
178182
}
179183

180184
/**
@@ -204,6 +208,8 @@ public static void clear() {
204208
onComputationScheduler = null;
205209
onIOScheduler = null;
206210
onNewThreadScheduler = null;
211+
212+
onGenericScheduledExecutorService = null;
207213
}
208214

209215
/**
@@ -956,4 +962,34 @@ public CompletableOnSubscribe call(CompletableOnSubscribe f) {
956962
};
957963

958964
}
965+
/**
966+
* Sets the hook function for returning a ScheduledExecutorService used
967+
* by the GenericScheduledExecutorService for background tasks.
968+
* <p>
969+
* This operation is threadsafe.
970+
* <p>
971+
* Calling with a {@code null} parameter restores the default behavior:
972+
* create the default with {@link java.util.concurrent.Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)}.
973+
* <p>
974+
* For the changes to take effect, the Schedulers has to be restarted.
975+
* @param factory the supplier that is called when the GenericScheduledExecutorService
976+
* is (re)started
977+
*/
978+
public static void setOnGenericScheduledExecutorService(Func0<? extends ScheduledExecutorService> factory) {
979+
if (lockdown) {
980+
return;
981+
}
982+
onGenericScheduledExecutorService = factory;
983+
}
984+
985+
/**
986+
* Returns the current factory for creating ScheduledExecutorServices in
987+
* GenericScheduledExecutorService utility.
988+
* <p>
989+
* This operation is threadsafe.
990+
* @return the current factory function
991+
*/
992+
public static Func0<? extends ScheduledExecutorService> getOnGenericScheduledExecutorService() {
993+
return onGenericScheduledExecutorService;
994+
}
959995
}

src/test/java/rx/plugins/RxJavaHooksTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,12 @@ public void lockdown() throws Exception {
143143
RxJavaHooks.lockdown();
144144
try {
145145
Action1 a1 = Actions.empty();
146+
Func0 f0 = new Func0() {
147+
@Override
148+
public Object call() {
149+
return null;
150+
}
151+
};
146152
Func1 f1 = UtilityFunctions.identity();
147153
Func2 f2 = new Func2() {
148154
@Override
@@ -158,6 +164,9 @@ public Object call(Object t1, Object t2) {
158164

159165
Object before = getter.invoke(null);
160166

167+
if (m.getParameterTypes()[0].isAssignableFrom(Func0.class)) {
168+
m.invoke(null, f0);
169+
} else
161170
if (m.getParameterTypes()[0].isAssignableFrom(Func1.class)) {
162171
m.invoke(null, f1);
163172
} else
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package rx.schedulers;
2+
3+
import java.util.concurrent.*;
4+
5+
import org.junit.*;
6+
7+
import rx.functions.Func0;
8+
import rx.internal.schedulers.GenericScheduledExecutorService;
9+
import rx.plugins.RxJavaHooks;
10+
11+
public class GenericScheduledExecutorServiceTest {
12+
13+
@Test
14+
public void genericScheduledExecutorServiceHook() {
15+
// make sure the class is initialized
16+
Assert.assertNotNull(GenericScheduledExecutorService.class);
17+
18+
final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
19+
try {
20+
21+
RxJavaHooks.setOnGenericScheduledExecutorService(new Func0<ScheduledExecutorService>() {
22+
@Override
23+
public ScheduledExecutorService call() {
24+
return exec;
25+
}
26+
});
27+
28+
Schedulers.shutdown();
29+
// start() is package private so had to move this test here
30+
Schedulers.start();
31+
32+
Assert.assertSame(exec, GenericScheduledExecutorService.getInstance());
33+
34+
RxJavaHooks.setOnGenericScheduledExecutorService(null);
35+
36+
Schedulers.shutdown();
37+
// start() is package private so had to move this test here
38+
Schedulers.start();
39+
40+
Assert.assertNotSame(exec, GenericScheduledExecutorService.getInstance());
41+
42+
} finally {
43+
RxJavaHooks.reset();
44+
exec.shutdownNow();
45+
}
46+
47+
}
48+
}

0 commit comments

Comments
 (0)