Skip to content

Commit 77f6a37

Browse files
committed
Experimental helper methods to submit actions to executors and specify
their interruptibility.
1 parent 8758fdd commit 77f6a37

File tree

7 files changed

+437
-31
lines changed

7 files changed

+437
-31
lines changed

src/main/java/rx/internal/schedulers/EventLoopsScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public Worker createWorker() {
9191
*/
9292
public Subscription scheduleDirect(Action0 action) {
9393
PoolWorker pw = pool.getEventLoop();
94-
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS);
94+
return pw.scheduleActual(action, -1, TimeUnit.NANOSECONDS, false);
9595
}
9696

9797
private static class EventLoopWorker extends Scheduler.Worker {
@@ -124,7 +124,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
124124
return Subscriptions.unsubscribed();
125125
}
126126

127-
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
127+
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit, false);
128128
innerSubscription.add(s);
129129
s.addParent(innerSubscription);
130130
return s;

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,19 +151,20 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
151151
if (isUnsubscribed) {
152152
return Subscriptions.unsubscribed();
153153
}
154-
return scheduleActual(action, delayTime, unit);
154+
return scheduleActual(action, delayTime, unit, true);
155155
}
156156

157157
/**
158-
* @warn javadoc missing
159-
* @param action
160-
* @param delayTime
161-
* @param unit
162-
* @return
158+
* Performs the actual scheduling of a potentially delayed task and assigns the
159+
* future to the ScheduledAction it returs.
160+
* @param action the action to schedule
161+
* @param delayTime the scheduling delay if positive
162+
* @param unit the scheduling delay's time unit
163+
* @return the ScheduledAction representing the task
163164
*/
164-
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
165+
public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit, boolean interruptOnUnsubscribe) {
165166
Action0 decoratedAction = schedulersHook.onSchedule(action);
166-
ScheduledAction run = new ScheduledAction(decoratedAction);
167+
ScheduledAction run = new ScheduledAction(decoratedAction, interruptOnUnsubscribe);
167168
Future<?> f;
168169
if (delayTime <= 0) {
169170
f = executor.submit(run);

src/main/java/rx/internal/schedulers/ScheduledAction.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616
package rx.internal.schedulers;
1717

1818
import java.util.concurrent.Future;
19-
import java.util.concurrent.atomic.AtomicBoolean;
20-
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.concurrent.atomic.*;
2120

2221
import rx.Subscription;
2322
import rx.exceptions.OnErrorNotImplementedException;
2423
import rx.functions.Action0;
24+
import rx.internal.util.SubscriptionList;
2525
import rx.plugins.RxJavaPlugins;
2626
import rx.subscriptions.CompositeSubscription;
2727

@@ -32,12 +32,20 @@
3232
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
3333
/** */
3434
private static final long serialVersionUID = -3962399486978279857L;
35-
final CompositeSubscription cancel;
35+
final SubscriptionList cancel;
3636
final Action0 action;
37+
volatile int interruptOnUnsubscribe;
38+
static final AtomicIntegerFieldUpdater<ScheduledAction> INTERRUPT_ON_UNSUBSCRIBE
39+
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "interruptOnUnsubscribe");
3740

3841
public ScheduledAction(Action0 action) {
42+
this(action, true);
43+
}
44+
45+
public ScheduledAction(Action0 action, boolean interruptOnUnsubscribe) {
3946
this.action = action;
40-
this.cancel = new CompositeSubscription();
47+
this.cancel = new SubscriptionList();
48+
this.interruptOnUnsubscribe = interruptOnUnsubscribe ? 1 : 0;
4149
}
4250

4351
@Override
@@ -61,16 +69,29 @@ public void run() {
6169
}
6270
}
6371

72+
/**
73+
* Sets the flag to indicate the underlying Future task should be interrupted on unsubscription or not.
74+
* @param interrupt the new interruptible status
75+
*/
76+
public void setInterruptOnUnsubscribe(boolean interrupt) {
77+
INTERRUPT_ON_UNSUBSCRIBE.lazySet(this, interrupt ? 1 : 0);
78+
}
79+
/**
80+
* Returns {@code true} if the underlying Future task will be interrupted on unsubscription.
81+
* @return the current interruptible status
82+
*/
83+
public boolean isInterruptOnUnsubscribe() {
84+
return interruptOnUnsubscribe != 0;
85+
}
86+
6487
@Override
6588
public boolean isUnsubscribed() {
6689
return cancel.isUnsubscribed();
6790
}
6891

6992
@Override
7093
public void unsubscribe() {
71-
if (!cancel.isUnsubscribed()) {
72-
cancel.unsubscribe();
73-
}
94+
cancel.unsubscribe();
7495
}
7596

7697
/**
@@ -89,7 +110,7 @@ public void add(Subscription s) {
89110
* @param f the future to add
90111
*/
91112
public void add(final Future<?> f) {
92-
cancel.add(new FutureCompleter(f));
113+
add(new FutureCompleter(f));
93114
}
94115

95116
/**
@@ -100,7 +121,7 @@ public void add(final Future<?> f) {
100121
* the parent {@code CompositeSubscription} to add
101122
*/
102123
public void addParent(CompositeSubscription parent) {
103-
cancel.add(new Remover(this, parent));
124+
add(new Remover(this, parent));
104125
}
105126

106127
/**
@@ -119,7 +140,7 @@ private FutureCompleter(Future<?> f) {
119140
@Override
120141
public void unsubscribe() {
121142
if (ScheduledAction.this.get() != Thread.currentThread()) {
122-
f.cancel(true);
143+
f.cancel(interruptOnUnsubscribe != 0);
123144
} else {
124145
f.cancel(false);
125146
}
@@ -155,4 +176,4 @@ public void unsubscribe() {
155176
}
156177

157178
}
158-
}
179+
}

src/main/java/rx/schedulers/CachedThreadScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
145145
return Subscriptions.unsubscribed();
146146
}
147147

148-
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
148+
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit, true);
149149
innerSubscription.add(s);
150150
s.addParent(innerSubscription);
151151
return s;

src/main/java/rx/schedulers/Schedulers.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,14 @@
1515
*/
1616
package rx.schedulers;
1717

18-
import rx.Scheduler;
19-
import rx.internal.schedulers.EventLoopsScheduler;
20-
import rx.plugins.RxJavaPlugins;
18+
import java.util.concurrent.*;
2119

22-
import java.util.concurrent.Executor;
20+
import rx.*;
21+
import rx.annotations.Experimental;
22+
import rx.functions.Action0;
23+
import rx.internal.schedulers.*;
24+
import rx.plugins.RxJavaPlugins;
25+
import rx.subscriptions.CompositeSubscription;
2326

2427
/**
2528
* Static factory methods for creating Schedulers.
@@ -137,4 +140,53 @@ public static TestScheduler test() {
137140
public static Scheduler from(Executor executor) {
138141
return new ExecutorScheduler(executor);
139142
}
140-
}
143+
/**
144+
* Submit an Action0 to the specified executor service with the option to interrupt the task
145+
* on unsubscription and add it to a parent composite subscription.
146+
* @param executor the target executor service
147+
* @param action the action to execute
148+
* @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it
149+
* once the action completes or is unsubscribed.
150+
* @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running
151+
* @return the Subscription representing the scheduled action which is also added to the {@code parent} composite
152+
*/
153+
@Experimental
154+
public static Subscription submitTo(ExecutorService executor, Action0 action, CompositeSubscription parent, boolean interruptOnUnsubscribe) {
155+
ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe);
156+
157+
if (parent != null) {
158+
parent.add(sa);
159+
sa.addParent(parent);
160+
}
161+
162+
Future<?> f = executor.submit(sa);
163+
sa.add(f);
164+
165+
return sa;
166+
}
167+
/**
168+
* Submit an Action0 to the specified executor service with the given delay and the option to interrupt the task
169+
* on unsubscription and add it to a parent composite subscription.
170+
* @param executor the target executor service
171+
* @param action the action to execute
172+
* @param delay the delay value
173+
* @param unit the time unit of the delay value
174+
* @param parent if not {@code null} the subscription representing the action is added to this composite with logic to remove it
175+
* once the action completes or is unsubscribed.
176+
* @param interruptOnUnsubscribe if {@code false}, unsubscribing the task will not interrupt the task if it is running
177+
* @return the Subscription representing the scheduled action which is also added to the {@code parent} composite
178+
*/
179+
@Experimental
180+
public static Subscription submitTo(ScheduledExecutorService executor, Action0 action, long delay, TimeUnit unit, CompositeSubscription parent, boolean interruptOnUnsubscribe) {
181+
ScheduledAction sa = new ScheduledAction(action, interruptOnUnsubscribe);
182+
183+
if (parent != null) {
184+
parent.add(sa);
185+
sa.addParent(parent);
186+
}
187+
188+
Future<?> f = executor.schedule(sa, delay, unit);
189+
sa.add(f);
190+
191+
return sa;
192+
}}

0 commit comments

Comments
 (0)