Skip to content

Commit cd0301b

Browse files
authored
1.x: compact MultipleAssignment- and Serial-Subscriptions (#4328)
* 1.x: compact MultipleAssignment- and Serial-Subscriptions * Update javadoc - mention unsubscription
1 parent d0b3bee commit cd0301b

File tree

6 files changed

+269
-124
lines changed

6 files changed

+269
-124
lines changed

src/main/java/rx/Scheduler.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@
1818
import java.util.concurrent.TimeUnit;
1919

2020
import rx.annotations.Experimental;
21-
import rx.functions.Action0;
22-
import rx.functions.Func1;
21+
import rx.functions.*;
2322
import rx.internal.schedulers.SchedulerWhen;
23+
import rx.internal.subscriptions.SequentialSubscription;
2424
import rx.schedulers.Schedulers;
25-
import rx.subscriptions.MultipleAssignmentSubscription;
2625

2726
/**
2827
* A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
@@ -126,7 +125,9 @@ public Subscription schedulePeriodically(final Action0 action, long initialDelay
126125
final long firstNowNanos = TimeUnit.MILLISECONDS.toNanos(now());
127126
final long firstStartInNanos = firstNowNanos + unit.toNanos(initialDelay);
128127

129-
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
128+
final SequentialSubscription first = new SequentialSubscription();
129+
final SequentialSubscription mas = new SequentialSubscription(first);
130+
130131
final Action0 recursiveAction = new Action0() {
131132
long count;
132133
long lastNowNanos = firstNowNanos;
@@ -155,14 +156,11 @@ public void call() {
155156
lastNowNanos = nowNanos;
156157

157158
long delay = nextTick - nowNanos;
158-
mas.set(schedule(this, delay, TimeUnit.NANOSECONDS));
159+
mas.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
159160
}
160161
}
161162
};
162-
MultipleAssignmentSubscription s = new MultipleAssignmentSubscription();
163-
// Should call `mas.set` before `schedule`, or the new Subscription may replace the old one.
164-
mas.set(s);
165-
s.set(schedule(recursiveAction, initialDelay, unit));
163+
first.replace(schedule(recursiveAction, initialDelay, unit));
166164
return mas;
167165
}
168166

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.subscriptions;
17+
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
import rx.Subscription;
21+
import rx.subscriptions.Subscriptions;
22+
23+
/**
24+
* A container of a Subscription that supports operations of SerialSubscription
25+
* and MultipleAssignmentSubscription via methods (update, replace) and extends
26+
* AtomicReference to reduce allocation count (beware the API leak of AtomicReference!).
27+
* @since 1.1.9
28+
*/
29+
public final class SequentialSubscription extends AtomicReference<Subscription> implements Subscription {
30+
31+
/** */
32+
private static final long serialVersionUID = 995205034283130269L;
33+
34+
/**
35+
* Create an empty SequentialSubscription.
36+
*/
37+
public SequentialSubscription() {
38+
39+
}
40+
41+
/**
42+
* Create a SequentialSubscription with the given initial Subscription.
43+
* @param initial the initial Subscription, may be null
44+
*/
45+
public SequentialSubscription(Subscription initial) {
46+
lazySet(initial);
47+
}
48+
49+
/**
50+
* Returns the current contained Subscription (may be null).
51+
* <p>(Remark: named as such because get() is final).
52+
* @return the current contained Subscription (may be null)
53+
*/
54+
public Subscription current() {
55+
Subscription current = super.get();
56+
if (current == Unsubscribed.INSTANCE) {
57+
return Subscriptions.unsubscribed();
58+
}
59+
return current;
60+
}
61+
62+
/**
63+
* Atomically sets the contained Subscription to the provided next value and unsubscribes
64+
* the previous value or unsubscribes the next value if this container is unsubscribed.
65+
* <p>(Remark: named as such because set() is final).
66+
* @param next the next Subscription to contain, may be null
67+
* @return true if the update succeded, false if the container was unsubscribed
68+
*/
69+
public boolean update(Subscription next) {
70+
for (;;) {
71+
Subscription current = get();
72+
73+
if (current == Unsubscribed.INSTANCE) {
74+
if (next != null) {
75+
next.unsubscribe();
76+
}
77+
return false;
78+
}
79+
80+
if (compareAndSet(current, next)) {
81+
if (current != null) {
82+
current.unsubscribe();
83+
}
84+
return true;
85+
}
86+
}
87+
}
88+
89+
/**
90+
* Atomically replaces the contained Subscription to the provided next value but
91+
* does not unsubscribe the previous value or unsubscribes the next value if this
92+
* container is unsubscribed.
93+
* @param next the next Subscription to contain, may be null
94+
* @return true if the update succeded, false if the container was unsubscribed
95+
*/
96+
public boolean replace(Subscription next) {
97+
for (;;) {
98+
Subscription current = get();
99+
100+
if (current == Unsubscribed.INSTANCE) {
101+
if (next != null) {
102+
next.unsubscribe();
103+
}
104+
return false;
105+
}
106+
107+
if (compareAndSet(current, next)) {
108+
return true;
109+
}
110+
}
111+
}
112+
113+
/**
114+
* Atomically tries to set the contained Subscription to the provided next value and unsubscribes
115+
* the previous value or unsubscribes the next value if this container is unsubscribed.
116+
* <p>
117+
* Unlike {@link #update(Subscription)}, this doesn't retry if the replace failed
118+
* because a concurrent operation changed the underlying contained object.
119+
* @param next the next Subscription to contain, may be null
120+
* @return true if the update succeded, false if the container was unsubscribed
121+
*/
122+
public boolean updateWeak(Subscription next) {
123+
Subscription current = get();
124+
if (current == Unsubscribed.INSTANCE) {
125+
if (next != null) {
126+
next.unsubscribe();
127+
}
128+
return false;
129+
}
130+
if (compareAndSet(current, next)) {
131+
return true;
132+
}
133+
134+
current = get();
135+
136+
if (next != null) {
137+
next.unsubscribe();
138+
}
139+
return current == Unsubscribed.INSTANCE;
140+
}
141+
142+
/**
143+
* Atomically tries to replace the contained Subscription to the provided next value but
144+
* does not unsubscribe the previous value or unsubscribes the next value if this container
145+
* is unsubscribed.
146+
* <p>
147+
* Unlike {@link #replace(Subscription)}, this doesn't retry if the replace failed
148+
* because a concurrent operation changed the underlying contained object.
149+
* @param next the next Subscription to contain, may be null
150+
* @return true if the update succeded, false if the container was unsubscribed
151+
*/
152+
public boolean replaceWeak(Subscription next) {
153+
Subscription current = get();
154+
if (current == Unsubscribed.INSTANCE) {
155+
if (next != null) {
156+
next.unsubscribe();
157+
}
158+
return false;
159+
}
160+
if (compareAndSet(current, next)) {
161+
return true;
162+
}
163+
164+
current = get();
165+
if (current == Unsubscribed.INSTANCE) {
166+
if (next != null) {
167+
next.unsubscribe();
168+
}
169+
return false;
170+
}
171+
return true;
172+
}
173+
174+
@Override
175+
public void unsubscribe() {
176+
Subscription current = get();
177+
if (current != Unsubscribed.INSTANCE) {
178+
current = getAndSet(Unsubscribed.INSTANCE);
179+
if (current != null && current != Unsubscribed.INSTANCE) {
180+
current.unsubscribe();
181+
}
182+
}
183+
}
184+
185+
@Override
186+
public boolean isUnsubscribed() {
187+
return get() == Unsubscribed.INSTANCE;
188+
}
189+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.subscriptions;
17+
18+
import rx.Subscription;
19+
20+
/**
21+
* Represents an unsubscribed Subscription via a singleton; don't leak it!
22+
*/
23+
public enum Unsubscribed implements Subscription {
24+
INSTANCE;
25+
26+
@Override
27+
public boolean isUnsubscribed() {
28+
return true;
29+
}
30+
31+
@Override
32+
public void unsubscribe() {
33+
// deliberately ignored
34+
}
35+
}

src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java

Lines changed: 7 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -15,56 +15,25 @@
1515
*/
1616
package rx.subscriptions;
1717

18-
import java.util.concurrent.atomic.AtomicReference;
19-
20-
import rx.Observable;
21-
import rx.Subscription;
18+
import rx.*;
19+
import rx.internal.subscriptions.SequentialSubscription;
2220

2321
/**
2422
* Subscription that can be checked for status such as in a loop inside an {@link Observable} to exit the loop
2523
* if unsubscribed.
2624
*/
2725
public final class MultipleAssignmentSubscription implements Subscription {
2826

29-
final AtomicReference<State> state = new AtomicReference<State>(new State(false, Subscriptions.empty()));
27+
final SequentialSubscription state = new SequentialSubscription();
3028

31-
static final class State {
32-
final boolean isUnsubscribed;
33-
final Subscription subscription;
34-
35-
State(boolean u, Subscription s) {
36-
this.isUnsubscribed = u;
37-
this.subscription = s;
38-
}
39-
40-
State unsubscribe() {
41-
return new State(true, subscription);
42-
}
43-
44-
State set(Subscription s) {
45-
return new State(isUnsubscribed, s);
46-
}
47-
48-
}
4929
@Override
5030
public boolean isUnsubscribed() {
51-
return state.get().isUnsubscribed;
31+
return state.isUnsubscribed();
5232
}
5333

5434
@Override
5535
public void unsubscribe() {
56-
State oldState;
57-
State newState;
58-
final AtomicReference<State> localState = this.state;
59-
do {
60-
oldState = localState.get();
61-
if (oldState.isUnsubscribed) {
62-
return;
63-
} else {
64-
newState = oldState.unsubscribe();
65-
}
66-
} while (!localState.compareAndSet(oldState, newState));
67-
oldState.subscription.unsubscribe();
36+
state.unsubscribe();
6837
}
6938

7039
/**
@@ -78,18 +47,7 @@ public void set(Subscription s) {
7847
if (s == null) {
7948
throw new IllegalArgumentException("Subscription can not be null");
8049
}
81-
State oldState;
82-
State newState;
83-
final AtomicReference<State> localState = this.state;
84-
do {
85-
oldState = localState.get();
86-
if (oldState.isUnsubscribed) {
87-
s.unsubscribe();
88-
return;
89-
} else {
90-
newState = oldState.set(s);
91-
}
92-
} while (!localState.compareAndSet(oldState, newState));
50+
state.replace(s);
9351
}
9452

9553
/**
@@ -98,7 +56,6 @@ public void set(Subscription s) {
9856
* @return the {@link Subscription} that underlies the {@code MultipleAssignmentSubscription}
9957
*/
10058
public Subscription get() {
101-
return state.get().subscription;
59+
return state.current();
10260
}
103-
10461
}

0 commit comments

Comments
 (0)