Skip to content

Add "Subscriptions.unsubscribed" to fix the 'isUnsubscribed' issue #1950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 13, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7305,7 +7305,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
}

Expand Down Expand Up @@ -7394,7 +7394,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
// TODO why aren't we throwing the hook's return value.
throw r;
}
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Subscription call(
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
timeoutSubscriber.onError(t);
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
return o.unsafeSubscribe(new Subscriber<U>() {

Expand All @@ -72,7 +72,7 @@ public void onNext(U t) {

});
} else {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
}
}, new TimeoutStub<T>() {
Expand All @@ -87,7 +87,7 @@ public Subscription call(
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
timeoutSubscriber.onError(t);
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
return o.unsafeSubscribe(new Subscriber<V>() {

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/schedulers/NewThreadWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Subscription schedule(final Action0 action) {
@Override
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
if (isUnsubscribed) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
return scheduleActual(action, delayTime, unit);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/CachedThreadScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public Subscription schedule(Action0 action) {
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}

ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/EventLoopsScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public Subscription schedule(Action0 action) {
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (innerSubscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}

ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/schedulers/ExecutorScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ExecutorSchedulerWorker(Executor executor) {
@Override
public Subscription schedule(Action0 action) {
if (isUnsubscribed()) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
ExecutorAction ea = new ExecutorAction(action, tasks);
tasks.add(ea);
Expand Down Expand Up @@ -106,7 +106,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
return schedule(action);
}
if (isUnsubscribed()) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
ScheduledExecutorService service;
if (executor instanceof ScheduledExecutorService) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/schedulers/ImmediateScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
@Override
public Subscription schedule(Action0 action) {
action.call();
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/rx/schedulers/TrampolineScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {

private Subscription enqueue(Action0 action, long execTime) {
if (innerSubscription.isUnsubscribed()) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
}
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
queue.add(timedAction);
Expand All @@ -81,7 +81,7 @@ private Subscription enqueue(Action0 action, long execTime) {
polled.action.call();
}
} while (wip.decrementAndGet() > 0);
return Subscriptions.empty();
return Subscriptions.unsubscribed();
} else {
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
return Subscriptions.create(new Action0() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*/
public final class MultipleAssignmentSubscription implements Subscription {
/** The shared empty state. */
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
static final State EMPTY_STATE = new State(false, Subscriptions.unsubscribed());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this one start as unsubscribed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subscriptions.unsubscribed() will be returned to the user when MultipleAssignmentSubscription is unsubscribed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wrong. It should be Subscriptions.empty.

volatile State state = EMPTY_STATE;
static final AtomicReferenceFieldUpdater<MultipleAssignmentSubscription, State> STATE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(MultipleAssignmentSubscription.class, State.class, "state");
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/subscriptions/RefCountSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public Subscription get() {
do {
oldState = state;
if (oldState.isUnsubscribed) {
return Subscriptions.empty();
return Subscriptions.unsubscribed();
} else {
newState = oldState.addChild();
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/rx/subscriptions/SerialSubscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* the previous underlying subscription to be unsubscribed.
*/
public final class SerialSubscription implements Subscription {
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
static final State EMPTY_STATE = new State(false, Subscriptions.unsubscribed());
volatile State state = EMPTY_STATE;
static final AtomicReferenceFieldUpdater<SerialSubscription, State> STATE_UPDATER
= AtomicReferenceFieldUpdater.newUpdater(SerialSubscription.class, State.class, "state");
Expand Down
41 changes: 34 additions & 7 deletions src/main/java/rx/subscriptions/Subscriptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.Subscription;
import rx.annotations.Experimental;
import rx.functions.Action0;
import rx.functions.Actions;

Expand All @@ -30,12 +31,38 @@ private Subscriptions() {
throw new IllegalStateException("No instances!");
}
/**
* Returns a {@link Subscription} that does nothing.
*
* @return a {@link Subscription} that does nothing
* Returns a {@link Subscription} that <code>unsubscribe</code> does nothing except changing
* <code>isUnsubscribed</code> to true. It's stateful and <code>isUnsubscribed</code>
* indicates if <code>unsubscribe</code> is called, which is different from {@link #unsubscribed()}.
*
* <pre><code>
* Subscription empty = Subscriptions.empty();
* System.out.println(empty.isUnsubscribed()); // false
* empty.unsubscribe();
* System.out.println(empty.isUnsubscribed()); // true
* </code></pre>
*
* @return a {@link Subscription} that <code>unsubscribe</code> does nothing except changing
* <code>isUnsubscribed</code> to true.
*/
public static Subscription empty() {
return EMPTY;
return BooleanSubscription.create();
}

/**
* Returns a {@link Subscription} that <code>unsubscribe</code> does nothing but is already unsubscribed.
* Its <code>isUnsubscribed</code> always return true, which is different from {@link #empty()}.
*
* <pre><code>
* Subscription unsubscribed = Subscriptions.unsubscribed();
* System.out.println(unsubscribed.isUnsubscribed()); // true
* </code></pre>
*
* @return a {@link Subscription} that <code>unsubscribe</code> does nothing but is already unsubscribed.
*/
@Experimental
public static Subscription unsubscribed() {
return UNSUBSCRIBED;
}

/**
Expand Down Expand Up @@ -124,16 +151,16 @@ public static CompositeSubscription from(Subscription... subscriptions) {
/**
* A {@link Subscription} that does nothing when its unsubscribe method is called.
*/
private static final Empty EMPTY = new Empty();
private static final Unsubscribed UNSUBSCRIBED = new Unsubscribed();
/** Naming classes helps with debugging. */
private static final class Empty implements Subscription {
private static final class Unsubscribed implements Subscription {
@Override
public void unsubscribe() {
}

@Override
public boolean isUnsubscribed() {
return false;
return true;
}
}
}
16 changes: 16 additions & 0 deletions src/test/java/rx/subscriptions/SubscriptionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package rx.subscriptions;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -35,4 +37,18 @@ public void testUnsubscribeOnlyOnce() {
subscription.unsubscribe();
verify(unsubscribe, times(1)).call();
}

@Test
public void testEmpty() {
Subscription empty = Subscriptions.empty();
assertFalse(empty.isUnsubscribed());
empty.unsubscribe();
assertTrue(empty.isUnsubscribed());
}

@Test
public void testUnsubscribed() {
Subscription unsubscribed = Subscriptions.unsubscribed();
assertTrue(unsubscribed.isUnsubscribed());
}
}