Skip to content

Commit 4c16adf

Browse files
committed
Add "Subscriptions.unsubscribed" to fix the 'isUnsubscribed' issue
1 parent b02e572 commit 4c16adf

13 files changed

+66
-23
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7305,7 +7305,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
73057305
// TODO why aren't we throwing the hook's return value.
73067306
throw r;
73077307
}
7308-
return Subscriptions.empty();
7308+
return Subscriptions.unsubscribed();
73097309
}
73107310
}
73117311

@@ -7394,7 +7394,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
73947394
// TODO why aren't we throwing the hook's return value.
73957395
throw r;
73967396
}
7397-
return Subscriptions.empty();
7397+
return Subscriptions.unsubscribed();
73987398
}
73997399
}
74007400

src/main/java/rx/internal/operators/OperatorTimeoutWithSelector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ public Subscription call(
5151
} catch (Throwable t) {
5252
Exceptions.throwIfFatal(t);
5353
timeoutSubscriber.onError(t);
54-
return Subscriptions.empty();
54+
return Subscriptions.unsubscribed();
5555
}
5656
return o.unsafeSubscribe(new Subscriber<U>() {
5757

@@ -72,7 +72,7 @@ public void onNext(U t) {
7272

7373
});
7474
} else {
75-
return Subscriptions.empty();
75+
return Subscriptions.unsubscribed();
7676
}
7777
}
7878
}, new TimeoutStub<T>() {
@@ -87,7 +87,7 @@ public Subscription call(
8787
} catch (Throwable t) {
8888
Exceptions.throwIfFatal(t);
8989
timeoutSubscriber.onError(t);
90-
return Subscriptions.empty();
90+
return Subscriptions.unsubscribed();
9191
}
9292
return o.unsafeSubscribe(new Subscriber<V>() {
9393

src/main/java/rx/internal/schedulers/NewThreadWorker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Subscription schedule(final Action0 action) {
5858
@Override
5959
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
6060
if (isUnsubscribed) {
61-
return Subscriptions.empty();
61+
return Subscriptions.unsubscribed();
6262
}
6363
return scheduleActual(action, delayTime, unit);
6464
}

src/main/java/rx/schedulers/CachedThreadScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public Subscription schedule(Action0 action) {
141141
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
142142
if (innerSubscription.isUnsubscribed()) {
143143
// don't schedule, we are unsubscribed
144-
return Subscriptions.empty();
144+
return Subscriptions.unsubscribed();
145145
}
146146

147147
ScheduledAction s = threadWorker.scheduleActual(action, delayTime, unit);

src/main/java/rx/schedulers/EventLoopsScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ public Subscription schedule(Action0 action) {
9595
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
9696
if (innerSubscription.isUnsubscribed()) {
9797
// don't schedule, we are unsubscribed
98-
return Subscriptions.empty();
98+
return Subscriptions.unsubscribed();
9999
}
100100

101101
ScheduledAction s = poolWorker.scheduleActual(action, delayTime, unit);

src/main/java/rx/schedulers/ExecutorScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public ExecutorSchedulerWorker(Executor executor) {
7171
@Override
7272
public Subscription schedule(Action0 action) {
7373
if (isUnsubscribed()) {
74-
return Subscriptions.empty();
74+
return Subscriptions.unsubscribed();
7575
}
7676
ExecutorAction ea = new ExecutorAction(action, tasks);
7777
tasks.add(ea);
@@ -106,7 +106,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
106106
return schedule(action);
107107
}
108108
if (isUnsubscribed()) {
109-
return Subscriptions.empty();
109+
return Subscriptions.unsubscribed();
110110
}
111111
ScheduledExecutorService service;
112112
if (executor instanceof ScheduledExecutorService) {

src/main/java/rx/schedulers/ImmediateScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
5656
@Override
5757
public Subscription schedule(Action0 action) {
5858
action.call();
59-
return Subscriptions.empty();
59+
return Subscriptions.unsubscribed();
6060
}
6161

6262
@Override

src/main/java/rx/schedulers/TrampolineScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
6868

6969
private Subscription enqueue(Action0 action, long execTime) {
7070
if (innerSubscription.isUnsubscribed()) {
71-
return Subscriptions.empty();
71+
return Subscriptions.unsubscribed();
7272
}
7373
final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(TrampolineScheduler.this));
7474
queue.add(timedAction);
@@ -81,7 +81,7 @@ private Subscription enqueue(Action0 action, long execTime) {
8181
polled.action.call();
8282
}
8383
} while (wip.decrementAndGet() > 0);
84-
return Subscriptions.empty();
84+
return Subscriptions.unsubscribed();
8585
} else {
8686
// queue wasn't empty, a parent is already processing so we just add to the end of the queue
8787
return Subscriptions.create(new Action0() {

src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
*/
2727
public final class MultipleAssignmentSubscription implements Subscription {
2828
/** The shared empty state. */
29-
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
29+
static final State EMPTY_STATE = new State(false, Subscriptions.unsubscribed());
3030
volatile State state = EMPTY_STATE;
3131
static final AtomicReferenceFieldUpdater<MultipleAssignmentSubscription, State> STATE_UPDATER
3232
= AtomicReferenceFieldUpdater.newUpdater(MultipleAssignmentSubscription.class, State.class, "state");

src/main/java/rx/subscriptions/RefCountSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public Subscription get() {
8080
do {
8181
oldState = state;
8282
if (oldState.isUnsubscribed) {
83-
return Subscriptions.empty();
83+
return Subscriptions.unsubscribed();
8484
} else {
8585
newState = oldState.addChild();
8686
}

src/main/java/rx/subscriptions/SerialSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
* the previous underlying subscription to be unsubscribed.
2525
*/
2626
public final class SerialSubscription implements Subscription {
27-
static final State EMPTY_STATE = new State(false, Subscriptions.empty());
27+
static final State EMPTY_STATE = new State(false, Subscriptions.unsubscribed());
2828
volatile State state = EMPTY_STATE;
2929
static final AtomicReferenceFieldUpdater<SerialSubscription, State> STATE_UPDATER
3030
= AtomicReferenceFieldUpdater.newUpdater(SerialSubscription.class, State.class, "state");

src/main/java/rx/subscriptions/Subscriptions.java

Lines changed: 34 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2020

2121
import rx.Subscription;
22+
import rx.annotations.Experimental;
2223
import rx.functions.Action0;
2324
import rx.functions.Actions;
2425

@@ -30,12 +31,38 @@ private Subscriptions() {
3031
throw new IllegalStateException("No instances!");
3132
}
3233
/**
33-
* Returns a {@link Subscription} that does nothing.
34-
*
35-
* @return a {@link Subscription} that does nothing
34+
* Returns a {@link Subscription} that <code>unsubscribe</code> does nothing except changing
35+
* <code>isUnsubscribed</code> to true. It's stateful and <code>isUnsubscribed</code>
36+
* indicates if <code>unsubscribe</code> is called, which is different from {@link #unsubscribed()}.
37+
*
38+
* <pre><code>
39+
* Subscription empty = Subscriptions.empty();
40+
* System.out.println(empty.isUnsubscribed()); // false
41+
* empty.unsubscribe();
42+
* System.out.println(empty.isUnsubscribed()); // true
43+
* </code></pre>
44+
*
45+
* @return a {@link Subscription} that <code>unsubscribe</code> does nothing except changing
46+
* <code>isUnsubscribed</code> to true.
3647
*/
3748
public static Subscription empty() {
38-
return EMPTY;
49+
return BooleanSubscription.create();
50+
}
51+
52+
/**
53+
* Returns a {@link Subscription} that <code>unsubscribe</code> does nothing but is already unsubscribed.
54+
* Its <code>isUnsubscribed</code> always return true, which is different from {@link #empty()}.
55+
*
56+
* <pre><code>
57+
* Subscription unsubscribed = Subscriptions.unsubscribed();
58+
* System.out.println(unsubscribed.isUnsubscribed()); // true
59+
* </code></pre>
60+
*
61+
* @return a {@link Subscription} that <code>unsubscribe</code> does nothing but is already unsubscribed.
62+
*/
63+
@Experimental
64+
public static Subscription unsubscribed() {
65+
return UNSUBSCRIBED;
3966
}
4067

4168
/**
@@ -124,16 +151,16 @@ public static CompositeSubscription from(Subscription... subscriptions) {
124151
/**
125152
* A {@link Subscription} that does nothing when its unsubscribe method is called.
126153
*/
127-
private static final Empty EMPTY = new Empty();
154+
private static final Unsubscribed UNSUBSCRIBED = new Unsubscribed();
128155
/** Naming classes helps with debugging. */
129-
private static final class Empty implements Subscription {
156+
private static final class Unsubscribed implements Subscription {
130157
@Override
131158
public void unsubscribe() {
132159
}
133160

134161
@Override
135162
public boolean isUnsubscribed() {
136-
return false;
163+
return true;
137164
}
138165
}
139166
}

src/test/java/rx/subscriptions/SubscriptionsTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.subscriptions;
1717

18+
import static org.junit.Assert.assertFalse;
19+
import static org.junit.Assert.assertTrue;
1820
import static org.mockito.Mockito.mock;
1921
import static org.mockito.Mockito.times;
2022
import static org.mockito.Mockito.verify;
@@ -35,4 +37,18 @@ public void testUnsubscribeOnlyOnce() {
3537
subscription.unsubscribe();
3638
verify(unsubscribe, times(1)).call();
3739
}
40+
41+
@Test
42+
public void testEmpty() {
43+
Subscription empty = Subscriptions.empty();
44+
assertFalse(empty.isUnsubscribed());
45+
empty.unsubscribe();
46+
assertTrue(empty.isUnsubscribed());
47+
}
48+
49+
@Test
50+
public void testUnsubscribed() {
51+
Subscription unsubscribed = Subscriptions.unsubscribed();
52+
assertTrue(unsubscribed.isUnsubscribed());
53+
}
3854
}

0 commit comments

Comments
 (0)