Skip to content

Commit b3a0446

Browse files
authored
1.x: Schedulers.from() to call RxJavaHooks.onScheduleAction (#4311)
1 parent 27c782d commit b3a0446

File tree

2 files changed

+52
-6
lines changed

2 files changed

+52
-6
lines changed

src/main/java/rx/internal/schedulers/ExecutorScheduler.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ public Subscription schedule(Action0 action) {
6464
if (isUnsubscribed()) {
6565
return Subscriptions.unsubscribed();
6666
}
67+
68+
action = RxJavaHooks.onScheduledAction(action);
69+
6770
ScheduledAction ea = new ScheduledAction(action, tasks);
6871
tasks.add(ea);
6972
queue.offer(ea);
@@ -111,14 +114,16 @@ public void run() {
111114
}
112115

113116
@Override
114-
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
117+
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
115118
if (delayTime <= 0) {
116119
return schedule(action);
117120
}
118121
if (isUnsubscribed()) {
119122
return Subscriptions.unsubscribed();
120123
}
121124

125+
final Action0 decorated = RxJavaHooks.onScheduledAction(action);
126+
122127
final MultipleAssignmentSubscription first = new MultipleAssignmentSubscription();
123128
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
124129
mas.set(first);
@@ -137,7 +142,7 @@ public void call() {
137142
return;
138143
}
139144
// schedule the real action untimed
140-
Subscription s2 = schedule(action);
145+
Subscription s2 = schedule(decorated);
141146
mas.set(s2);
142147
// unless the worker is unsubscribed, we should get a new ScheduledAction
143148
if (s2.getClass() == ScheduledAction.class) {

src/test/java/rx/internal/schedulers/ExecutorSchedulerTest.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@
1919

2020
import java.util.concurrent.*;
2121
import java.util.concurrent.atomic.AtomicInteger;
22-
import org.junit.Test;
22+
23+
import org.junit.*;
24+
2325
import rx.*;
2426
import rx.Scheduler.Worker;
2527
import rx.functions.*;
2628
import rx.internal.schedulers.ExecutorScheduler.ExecutorSchedulerWorker;
2729
import rx.internal.util.RxThreadFactory;
28-
import rx.schedulers.AbstractSchedulerConcurrencyTests;
29-
import rx.schedulers.SchedulerTests;
30-
import rx.schedulers.Schedulers;
30+
import rx.plugins.RxJavaHooks;
31+
import rx.schedulers.*;
3132

3233
public class ExecutorSchedulerTest extends AbstractSchedulerConcurrencyTests {
3334

@@ -208,4 +209,44 @@ public void call() {
208209

209210
assertFalse(w.tasks.hasSubscriptions());
210211
}
212+
213+
@Test
214+
public void actionHookCalled() throws Exception {
215+
ExecutorService exec = Executors.newSingleThreadExecutor();
216+
try {
217+
final int[] call = { 0 };
218+
219+
RxJavaHooks.setOnScheduleAction(new Func1<Action0, Action0>() {
220+
@Override
221+
public Action0 call(Action0 t) {
222+
call[0]++;
223+
return t;
224+
}
225+
});
226+
227+
Scheduler s = Schedulers.from(exec);
228+
229+
Worker w = s.createWorker();
230+
231+
final CountDownLatch cdl = new CountDownLatch(1);
232+
233+
try {
234+
w.schedule(new Action0() {
235+
@Override
236+
public void call() {
237+
cdl.countDown();
238+
}
239+
});
240+
241+
Assert.assertTrue("Action timed out", cdl.await(5, TimeUnit.SECONDS));
242+
} finally {
243+
w.unsubscribe();
244+
}
245+
246+
Assert.assertEquals("Hook not called!", 1, call[0]);
247+
} finally {
248+
RxJavaHooks.reset();
249+
exec.shutdown();
250+
}
251+
}
211252
}

0 commit comments

Comments
 (0)