Skip to content

Scheduled action no interrupt #1898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Nov 29, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ScheduledAction scheduleActual(final Action0 action, long delayTime, Time
} else {
f = executor.schedule(run, delayTime, unit);
}
run.add(Subscriptions.from(f));
run.add(f);

return run;
}
Expand Down
65 changes: 51 additions & 14 deletions src/main/java/rx/internal/schedulers/ScheduledAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
*/
package rx.internal.schedulers;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
import rx.exceptions.OnErrorNotImplementedException;
Expand All @@ -27,12 +29,11 @@
* A {@code Runnable} that executes an {@code Action0} and can be cancelled. The analog is the
* {@code Subscriber} in respect of an {@code Observer}.
*/
public final class ScheduledAction implements Runnable, Subscription {
public final class ScheduledAction extends AtomicReference<Thread> implements Runnable, Subscription {
/** */
private static final long serialVersionUID = -3962399486978279857L;
final CompositeSubscription cancel;
final Action0 action;
volatile int once;
static final AtomicIntegerFieldUpdater<ScheduledAction> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(ScheduledAction.class, "once");

public ScheduledAction(Action0 action) {
this.action = action;
Expand All @@ -42,6 +43,7 @@ public ScheduledAction(Action0 action) {
@Override
public void run() {
try {
lazySet(Thread.currentThread());
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
Expand All @@ -66,21 +68,30 @@ public boolean isUnsubscribed() {

@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
if (!cancel.isUnsubscribed()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't Atomic like the ONCE_UPDATER was, so are you just trusting the cancel Subscription to be idempotent in the event of a race?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I ran some profiling and saw cancel.unsubscribe() to be expensive. Adding this check reduced the time spent in unsubscribe() by half. Usually, there isn't any race but just the self cancellation and then when the ScheduledAction is removed from its parent's CompositeSubscription.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand. The ONCE_UPDATER would be sure we only call unsubscribe once, and it needs to be called at least once, so what performance benefit happened by removing ONCE_UPDATER that now allows it to possibly be called more than once?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is known the ScheduledAction.unsubscribe is called at least twice: once at the end of the run and a second time because the task is removed via CompositeSubscription.remove() from the parent scheduler. If the task terminates normally, the second call to unsubscribe will guaranteed to fail the CAS which has high cost anyways. Testing the cancel.isUnsubscribed() is effectively a volatile read with far less cost. In addition, it saved 4 bytes of memory per instance now usable by passing the thread around.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the cost of a failed CAS compared with a volatile read?

I'm not arguing that testing isUnsubscribed() is cheap or expensive, just that it's not atomic. We could end up calling unsubscribe twice without the CAS check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On X86, volatile read cost the same as any regular memory read. CAS is around 20-40 cycles. I've run a small program and measured always failing and always succeding CAS on my i7 4770 @ 3.5GHz and gave me ~5.2 ns in both cases, If I test the variable before CAS, I get 1.6ns for always fails and 8.8ns for always succeeding. So for 2 unsubscribe calls, we get 10.4ns in the CAS only case and 10.4ns with read-before-cas. However, any additional unsubscribe calls adds only 1.6ns with read-before-cas instead of 5.2ns. I must admit, the 2 unsubscribe calls costing the same time suprises me.

The unsubscribe() is still idempotent because either cancel.isUnsubscribed returns true and nothing happens or cancel.unsubscribe() is called which is idempotent.

public class CasFailureTest {
    public static void main(String[] args) {
        AtomicInteger v = new AtomicInteger(0);
//      AtomicInteger v = new AtomicInteger(1);

        int k = 100_000_000;
        for (int i = 0; i < 10; i++) {
            long n = System.nanoTime();
            for (int j = 0; j < k; j++) {
                if (v.get() == j) {
                    v.compareAndSet(j, j + 1);
                }
//              if (v.get() == 0)
//                  v.compareAndSet(0, 1);
            }
            System.out.println(1d * (System.nanoTime() - n) / k);
            v.set(0);
        }
    }
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is great information! Do you think it would be valuable to convert this into a JMH test and commit it with the baseline benchmarks similar to https://github.com/ReactiveX/RxJava/blob/1.x/src/perf/java/rx/PerfBaseline.java along with a README of what costs are for these different things?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or cancel.unsubscribe() is called which is idempotent.

This is the part that I wasn't sure of, so if that is idempotent then there definitely is not a need for the extra ONCE_UPDATER. Thanks for walking me through this patiently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an exercise to learn JMH, sure. Otherwise, it would be a curiosity only.

cancel.unsubscribe();
}
}

/**
* @warn javadoc missing
* Adds a general Subscription to this {@code ScheduledAction} that will be unsubscribed
* if the underlying {@code action} completes or the this scheduled action is cancelled.
*
* @param s
* @warn param "s" undescribed
* @param s the Subscription to add
*/
public void add(Subscription s) {
cancel.add(s);
}

/**
* Adds the given Future to the unsubscription composite in order to support
* cancelling the underlying task in the executor framework.
* @param f the future to add
*/
public void add(final Future<?> f) {
cancel.add(new FutureCompleter(f));
}

/**
* Adds a parent {@link CompositeSubscription} to this {@code ScheduledAction} so when the action is
* cancelled or terminates, it can remove itself from this parent.
Expand All @@ -92,13 +103,39 @@ public void addParent(CompositeSubscription parent) {
cancel.add(new Remover(this, parent));
}

/**
* Cancels the captured future if the caller of the call method
* is not the same as the runner of the outer ScheduledAction to
* prevent unnecessary self-interrupting if the unsubscription
* happens from the same thread.
*/
private final class FutureCompleter implements Subscription {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would the need for this go away if we stopped interrupting when we cancel a Future?

Take a look at this discussion: #1804 (comment)

I question whether it is correct to interrupt by default when we cancel. It seems that should not be what we do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the only way a blocking IO can be interrupted when run inside our schedulers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why shouldn't that be left to the Observable to add that interrupt support instead of us interrupting everything?

Inside the Observable.create a developer can register Thread.interrupt to be done onUnsubscribe via add.

Interrupting by default has caused issues at least twice now. The use case in #1804 was really odd and honestly not something we should ever have to be concerned with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with such Thread interrupt is to make sure the interrupt happens while the task is running, otherwise, you risk interrupting someone else's task; The java FutureTask and Executors have delicate logic to make sure the call to cancel is delivered only if the task is active.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's kind of why I'm thinking we should not be doing interrupts by default, for the same "delicate" reasons we've run into.

private final Future<?> f;

private FutureCompleter(Future<?> f) {
this.f = f;
}

@Override
public void unsubscribe() {
if (ScheduledAction.this.get() != Thread.currentThread()) {
f.cancel(true);
} else {
f.cancel(false);
}
}
@Override
public boolean isUnsubscribed() {
return f.isCancelled();
}
}

/** Remove a child subscription from a composite when unsubscribing. */
private static final class Remover implements Subscription {
private static final class Remover extends AtomicBoolean implements Subscription {
/** */
private static final long serialVersionUID = 247232374289553518L;
final Subscription s;
final CompositeSubscription parent;
volatile int once;
static final AtomicIntegerFieldUpdater<Remover> ONCE_UPDATER
= AtomicIntegerFieldUpdater.newUpdater(Remover.class, "once");

public Remover(Subscription s, CompositeSubscription parent) {
this.s = s;
Expand All @@ -112,7 +149,7 @@ public boolean isUnsubscribed() {

@Override
public void unsubscribe() {
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
if (compareAndSet(false, true)) {
parent.remove(s);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/perf/java/rx/schedulers/ComputationSchedulerPerf.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class ComputationSchedulerPerf {
@State(Scope.Thread)
public static class Input extends InputWithIncrementingInteger {

@Param({ "1", "1000", "1000000" })
@Param({ "1", "10", "100", "1000", "2000", "3000", "4000", "10000", "100000", "1000000" })
public int size;

@Override
Expand Down
48 changes: 48 additions & 0 deletions src/test/java/rx/schedulers/NewThreadSchedulerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,18 @@

package rx.schedulers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import junit.framework.Assert;

import org.junit.Test;

import rx.Scheduler;
import rx.functions.Action0;
import rx.internal.schedulers.ScheduledAction;
import rx.subscriptions.Subscriptions;

public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests {

Expand All @@ -35,4 +45,42 @@ public final void testUnhandledErrorIsDeliveredToThreadHandler() throws Interrup
public final void testHandledErrorIsNotDeliveredToThreadHandler() throws InterruptedException {
SchedulerTests.testHandledErrorIsNotDeliveredToThreadHandler(getScheduler());
}
@Test(timeout = 3000)
public void testNoSelfInterrupt() throws InterruptedException {
Scheduler.Worker worker = Schedulers.newThread().createWorker();
try {
final CountDownLatch run = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
final AtomicBoolean interruptFlag = new AtomicBoolean();

ScheduledAction sa = (ScheduledAction)worker.schedule(new Action0() {
@Override
public void call() {
try {
run.await();
} catch (InterruptedException ex) {
exception.set(ex);
}
}
});

sa.add(Subscriptions.create(new Action0() {
@Override
public void call() {
interruptFlag.set(Thread.currentThread().isInterrupted());
done.countDown();
}
}));

run.countDown();

done.await();

Assert.assertEquals(null, exception.get());
Assert.assertFalse("Interrupted?!", interruptFlag.get());
} finally {
worker.unsubscribe();
}
}
}