-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Scheduled action no interrupt #1898
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8d6a186
1f8389f
51d68ed
f0dbccb
af48fca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,9 @@ | |
*/ | ||
package rx.internal.schedulers; | ||
|
||
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; | ||
import java.util.concurrent.Future; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import rx.Subscription; | ||
import rx.exceptions.OnErrorNotImplementedException; | ||
|
@@ -27,12 +29,11 @@ | |
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the | ||
* {@code Subscriber} in respect of an {@code Observer}. | ||
*/ | ||
public final class ScheduledAction implements Runnable, Subscription { | ||
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription { | ||
/** */ | ||
private static final long serialVersionUID = -3962399486978279857L; | ||
final CompositeSubscription cancel; | ||
final Action0 action; | ||
volatile int once; | ||
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER | ||
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once"); | ||
|
||
public ScheduledAction(Action0 action) { | ||
this.action = action; | ||
|
@@ -42,6 +43,7 @@ public ScheduledAction(Action0 action) { | |
@Override | ||
public void run() { | ||
try { | ||
lazySet(Thread.currentThread()); | ||
action.call(); | ||
} catch (Throwable e) { | ||
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this | ||
|
@@ -66,21 +68,30 @@ public boolean isUnsubscribed() { | |
|
||
@Override | ||
public void unsubscribe() { | ||
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { | ||
if (!cancel.isUnsubscribed()) { | ||
cancel.unsubscribe(); | ||
} | ||
} | ||
|
||
/** | ||
* @warn javadoc missing | ||
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed | ||
* if the underlying {@code action} completes or the this scheduled action is cancelled. | ||
* | ||
* @param s | ||
* @warn param "s" undescribed | ||
* @param s the Subscription to add | ||
*/ | ||
public void add(Subscription s) { | ||
cancel.add(s); | ||
} | ||
|
||
/** | ||
* Adds the given Future to the unsubscription composite in order to support | ||
* cancelling the underlying task in the executor framework. | ||
* @param f the future to add | ||
*/ | ||
public void add(final Future<?> f) { | ||
cancel.add(new FutureCompleter(f)); | ||
} | ||
|
||
/** | ||
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is | ||
* cancelled or terminates, it can remove itself from this parent. | ||
|
@@ -92,13 +103,39 @@ public void addParent(CompositeSubscription parent) { | |
cancel.add(new Remover(this, parent)); | ||
} | ||
|
||
/** | ||
* Cancels the captured future if the caller of the call method | ||
* is not the same as the runner of the outer ScheduledAction to | ||
* prevent unnecessary self-interrupting if the unsubscription | ||
* happens from the same thread. | ||
*/ | ||
private final class FutureCompleter implements Subscription { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would the need for this go away if we stopped interrupting when we cancel a Take a look at this discussion: #1804 (comment) I question whether it is correct to interrupt by default when we cancel. It seems that should not be what we do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is the only way a blocking IO can be interrupted when run inside our schedulers. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But why shouldn't that be left to the Observable to add that interrupt support instead of us interrupting everything? Inside the Observable.create a developer can register Thread.interrupt to be done onUnsubscribe via Interrupting by default has caused issues at least twice now. The use case in #1804 was really odd and honestly not something we should ever have to be concerned with. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem with such Thread interrupt is to make sure the interrupt happens while the task is running, otherwise, you risk interrupting someone else's task; The java FutureTask and Executors have delicate logic to make sure the call to cancel is delivered only if the task is active. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's kind of why I'm thinking we should not be doing interrupts by default, for the same "delicate" reasons we've run into. |
||
private final Future<?> f; | ||
|
||
private FutureCompleter(Future<?> f) { | ||
this.f = f; | ||
} | ||
|
||
@Override | ||
public void unsubscribe() { | ||
if (ScheduledAction.this.get() != Thread.currentThread()) { | ||
f.cancel(true); | ||
} else { | ||
f.cancel(false); | ||
} | ||
} | ||
@Override | ||
public boolean isUnsubscribed() { | ||
return f.isCancelled(); | ||
} | ||
} | ||
|
||
/** Remove a child subscription from a composite when unsubscribing. */ | ||
private static final class Remover implements Subscription { | ||
private static final class Remover extends AtomicBoolean implements Subscription { | ||
/** */ | ||
private static final long serialVersionUID = 247232374289553518L; | ||
final Subscription s; | ||
final CompositeSubscription parent; | ||
volatile int once; | ||
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER | ||
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once"); | ||
|
||
public Remover(Subscription s, CompositeSubscription parent) { | ||
this.s = s; | ||
|
@@ -112,7 +149,7 @@ public boolean isUnsubscribed() { | |
|
||
@Override | ||
public void unsubscribe() { | ||
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { | ||
if (compareAndSet(false, true)) { | ||
parent.remove(s); | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't Atomic like the ONCE_UPDATER was, so are you just trusting the
cancel
Subscription to be idempotent in the event of a race?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I ran some profiling and saw cancel.unsubscribe() to be expensive. Adding this check reduced the time spent in unsubscribe() by half. Usually, there isn't any race but just the self cancellation and then when the ScheduledAction is removed from its parent's CompositeSubscription.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. The ONCE_UPDATER would be sure we only call
unsubscribe
once, and it needs to be called at least once, so what performance benefit happened by removing ONCE_UPDATER that now allows it to possibly be called more than once?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is known the ScheduledAction.unsubscribe is called at least twice: once at the end of the run and a second time because the task is removed via CompositeSubscription.remove() from the parent scheduler. If the task terminates normally, the second call to unsubscribe will guaranteed to fail the CAS which has high cost anyways. Testing the cancel.isUnsubscribed() is effectively a volatile read with far less cost. In addition, it saved 4 bytes of memory per instance now usable by passing the thread around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the cost of a failed CAS compared with a volatile read?
I'm not arguing that testing isUnsubscribed() is cheap or expensive, just that it's not atomic. We could end up calling unsubscribe twice without the CAS check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On X86, volatile read cost the same as any regular memory read. CAS is around 20-40 cycles. I've run a small program and measured always failing and always succeding CAS on my i7 4770 @ 3.5GHz and gave me ~5.2 ns in both cases, If I test the variable before CAS, I get 1.6ns for always fails and 8.8ns for always succeeding. So for 2 unsubscribe calls, we get 10.4ns in the CAS only case and 10.4ns with read-before-cas. However, any additional unsubscribe calls adds only 1.6ns with read-before-cas instead of 5.2ns. I must admit, the 2 unsubscribe calls costing the same time suprises me.
The unsubscribe() is still idempotent because either cancel.isUnsubscribed returns true and nothing happens or cancel.unsubscribe() is called which is idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is great information! Do you think it would be valuable to convert this into a JMH test and commit it with the baseline benchmarks similar to https://github.com/ReactiveX/RxJava/blob/1.x/src/perf/java/rx/PerfBaseline.java along with a README of what costs are for these different things?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the part that I wasn't sure of, so if that is idempotent then there definitely is not a need for the extra ONCE_UPDATER. Thanks for walking me through this patiently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an exercise to learn JMH, sure. Otherwise, it would be a curiosity only.