Skip to content

1.x: compact MultipleAssignment- and Serial-Subscriptions #4328

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
import java.util.concurrent.TimeUnit;

import rx.annotations.Experimental;
import rx.functions.Action0;
import rx.functions.Func1;
import rx.functions.*;
import rx.internal.schedulers.SchedulerWhen;
import rx.internal.subscriptions.SequentialSubscription;
import rx.schedulers.Schedulers;
import rx.subscriptions.MultipleAssignmentSubscription;

/**
* A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
Expand Down Expand Up @@ -126,7 +125,9 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);

final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
final SequentialSubscription first = new SequentialSubscription();
final SequentialSubscription mas = new SequentialSubscription(first);

final Action0 recursiveAction = new Action0() {
long count;
long lastNowNanos = firstNowNanos;
Expand Down Expand Up @@ -155,14 +156,11 @@ public void call() {
lastNowNanos = nowNanos;

long delay = nextTick - nowNanos;
mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
};
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
// Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
mas.set(s);
s.set(schedule(recursiveAction, initialDelay, unit));
first.replace(schedule(recursiveAction, initialDelay, unit));
return mas;
}

Expand Down
189 changes: 189 additions & 0 deletions src/main/java/rx/internal/subscriptions/SequentialSubscription.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.subscriptions;

import java.util.concurrent.atomic.AtomicReference;

import rx.Subscription;
import rx.subscriptions.Subscriptions;

/**
* A container of a Subscription that supports operations of SerialSubscription
* and MultipleAssignmentSubscription via methods (update, replace) and extends
* AtomicReference to reduce allocation count (beware the API leak of AtomicReference!).
* @since 1.1.9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we really need this for internal classes?

*/
public final class SequentialSubscription extends AtomicReference<Subscription> implements Subscription {

/** */
private static final long serialVersionUID = 995205034283130269L;

/**
* Create an empty SequentialSubscription.
*/
public SequentialSubscription() {

}

/**
* Create a SequentialSubscription with the given initial Subscription.
* @param initial the initial Subscription, may be null
*/
public SequentialSubscription(Subscription initial) {
lazySet(initial);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to prefer this over super(initial)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The super constructor uses a full barrier volatile write.

}

/**
* Returns the current contained Subscription (may be null).
* <p>(Remark: named as such because get() is final).
* @return the current contained Subscription (may be null)
*/
public Subscription current() {
Subscription current = super.get();
if (current == Unsubscribed.INSTANCE) {
return Subscriptions.unsubscribed();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make Subscriptions.unsubscribed() return Unsubscribed.INSTANCE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or you want them to be different objects?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unsubscribed.INSTANCE is an internal-only marker instance. If it were to leak into the public API it could be set on one of these subscriptions which would then think it was already unsubscribed (even though it wasn't).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great if its name somehow show that it's internal only so we could pay more attention to leaking it during code reviews

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The package name and javadoc indicates it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, though they're ~invisible during code review

}
return current;
}

/**
* Atomically sets the contained Subscription to the provided next value and unsubscribes
* the previous value or unsubscribes the next value if this container is unsubscribed.
* <p>(Remark: named as such because set() is final).
* @param next the next Subscription to contain, may be null
* @return true if the update succeded, false if the container was unsubscribed
*/
public boolean update(Subscription next) {
for (;;) {
Subscription current = get();

if (current == Unsubscribed.INSTANCE) {
if (next != null) {
next.unsubscribe();
}
return false;
}

if (compareAndSet(current, next)) {
if (current != null) {
current.unsubscribe();
}
return true;
}
}
}

/**
* Atomically replaces the contained Subscription to the provided next value but
* does not unsubscribe the previous value or unsubscribes the next value if this
* container is unsubscribed.
* @param next the next Subscription to contain, may be null
* @return true if the update succeded, false if the container was unsubscribed
*/
public boolean replace(Subscription next) {
for (;;) {
Subscription current = get();

if (current == Unsubscribed.INSTANCE) {
if (next != null) {
next.unsubscribe();
}
return false;
}

if (compareAndSet(current, next)) {
return true;
}
}
}

/**
* Atomically tries to set the contained Subscription to the provided next value and unsubscribes
* the previous value or unsubscribes the next value if this container is unsubscribed.
* <p>
* Unlike {@link #update(Subscription)}, this doesn't retry if the replace failed
* because a concurrent operation changed the underlying contained object.
* @param next the next Subscription to contain, may be null
* @return true if the update succeded, false if the container was unsubscribed
*/
public boolean updateWeak(Subscription next) {
Subscription current = get();
if (current == Unsubscribed.INSTANCE) {
if (next != null) {
next.unsubscribe();
}
return false;
}
if (compareAndSet(current, next)) {
return true;
}

current = get();

if (next != null) {
next.unsubscribe();
}
return current == Unsubscribed.INSTANCE;
}

/**
* Atomically tries to replace the contained Subscription to the provided next value but
* does not unsubscribe the previous value or unsubscribes the next value if this container
* is unsubscribed.
* <p>
* Unlike {@link #replace(Subscription)}, this doesn't retry if the replace failed
* because a concurrent operation changed the underlying contained object.
* @param next the next Subscription to contain, may be null
* @return true if the update succeded, false if the container was unsubscribed
*/
public boolean replaceWeak(Subscription next) {
Subscription current = get();
if (current == Unsubscribed.INSTANCE) {
if (next != null) {
next.unsubscribe();
}
return false;
}
if (compareAndSet(current, next)) {
return true;
}

current = get();
if (current == Unsubscribed.INSTANCE) {
if (next != null) {
next.unsubscribe();
}
return false;
}
return true;
}

@Override
public void unsubscribe() {
Subscription current = get();
if (current != Unsubscribed.INSTANCE) {
current = getAndSet(Unsubscribed.INSTANCE);
if (current != null && current != Unsubscribed.INSTANCE) {
current.unsubscribe();
}
}
}

@Override
public boolean isUnsubscribed() {
return get() == Unsubscribed.INSTANCE;
}
}
35 changes: 35 additions & 0 deletions src/main/java/rx/internal/subscriptions/Unsubscribed.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.subscriptions;

import rx.Subscription;

/**
* Represents an unsubscribed Subscription via a singleton; don't leak it!
*/
public enum Unsubscribed implements Subscription {
INSTANCE;

@Override
public boolean isUnsubscribed() {
return true;
}

@Override
public void unsubscribe() {
// deliberately ignored
}
}
57 changes: 7 additions & 50 deletions src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,25 @@
*/
package rx.subscriptions;

import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Subscription;
import rx.*;
import rx.internal.subscriptions.SequentialSubscription;

/**
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop
* if unsubscribed.
*/
public final class MultipleAssignmentSubscription implements Subscription {

final AtomicReference<State> state = new AtomicReference<State>(new State(false, Subscriptions.empty()));
final SequentialSubscription state = new SequentialSubscription();

static final class State {
final boolean isUnsubscribed;
final Subscription subscription;

State(boolean u, Subscription s) {
this.isUnsubscribed = u;
this.subscription = s;
}

State unsubscribe() {
return new State(true, subscription);
}

State set(Subscription s) {
return new State(isUnsubscribed, s);
}

}
@Override
public boolean isUnsubscribed() {
return state.get().isUnsubscribed;
return state.isUnsubscribed();
}

@Override
public void unsubscribe() {
State oldState;
State newState;
final AtomicReference<State> localState = this.state;
do {
oldState = localState.get();
if (oldState.isUnsubscribed) {
return;
} else {
newState = oldState.unsubscribe();
}
} while (!localState.compareAndSet(oldState, newState));
oldState.subscription.unsubscribe();
state.unsubscribe();
}

/**
Expand All @@ -78,18 +47,7 @@ public void set(Subscription s) {
if (s == null) {
throw new IllegalArgumentException("Subscription can not be null");
}
State oldState;
State newState;
final AtomicReference<State> localState = this.state;
do {
oldState = localState.get();
if (oldState.isUnsubscribed) {
s.unsubscribe();
return;
} else {
newState = oldState.set(s);
}
} while (!localState.compareAndSet(oldState, newState));
state.replace(s);
}

/**
Expand All @@ -98,7 +56,6 @@ public void set(Subscription s) {
* @return the {@link Subscription} that underlies the {@code MultipleAssignmentSubscription}
*/
public Subscription get() {
return state.get().subscription;
return state.current();
}

}
Loading