-
Notifications
You must be signed in to change notification settings - Fork 7.6k
WIP: OnSubscribeRefCount Synchronous #1754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,195 +15,83 @@ | |
*/ | ||
package rx.internal.operators; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.WeakHashMap; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
|
||
import rx.Observable.OnSubscribe; | ||
import rx.Subscriber; | ||
import rx.Subscription; | ||
import rx.exceptions.Exceptions; | ||
import rx.functions.Action0; | ||
import rx.functions.Action1; | ||
import rx.observables.ConnectableObservable; | ||
import rx.subscriptions.MultipleAssignmentSubscription; | ||
import rx.subscriptions.Subscriptions; | ||
|
||
/** | ||
* Returns an observable sequence that stays connected to the source as long | ||
* as there is at least one subscription to the observable sequence. | ||
* @param <T> the value type | ||
* Returns an observable sequence that stays connected to the source as long as | ||
* there is at least one subscription to the observable sequence. | ||
* | ||
* @param <T> | ||
* the value type | ||
*/ | ||
public final class OnSubscribeRefCount<T> implements OnSubscribe<T> { | ||
final ConnectableObservable<? extends T> source; | ||
final Object guard; | ||
/** Guarded by guard. */ | ||
int index; | ||
/** Guarded by guard. */ | ||
boolean emitting; | ||
/** Guarded by guard. If true, indicates a connection request, false indicates a disconnect request. */ | ||
List<Token> queue; | ||
/** Manipulated while in the serialized section. */ | ||
int count; | ||
/** Manipulated while in the serialized section. */ | ||
Subscription connection; | ||
/** Manipulated while in the serialized section. */ | ||
final Map<Token, Object> connectionStatus; | ||
/** Occupied indicator. */ | ||
private static final Object OCCUPIED = new Object(); | ||
|
||
private final ConnectableObservable<? extends T> source; | ||
private final AtomicReference<MultipleAssignmentSubscription> connectedSubscription = new AtomicReference<MultipleAssignmentSubscription>(); | ||
private final AtomicInteger subscriptionCount = new AtomicInteger(0); | ||
|
||
/** | ||
* Constructor. | ||
* | ||
* @param source | ||
* observable to apply ref count to | ||
*/ | ||
public OnSubscribeRefCount(ConnectableObservable<? extends T> source) { | ||
this.source = source; | ||
this.guard = new Object(); | ||
this.connectionStatus = new WeakHashMap<Token, Object>(); | ||
} | ||
|
||
@Override | ||
public void call(Subscriber<? super T> t1) { | ||
int id; | ||
synchronized (guard) { | ||
id = ++index; | ||
public void call(final Subscriber<? super T> subscriber) { | ||
if (subscriber.isUnsubscribed()) { | ||
return; | ||
} | ||
// always subscribe | ||
source.unsafeSubscribe(subscriber); | ||
|
||
int count = subscriptionCount.getAndIncrement(); | ||
if (count == 0) { | ||
// starting a new connection | ||
final MultipleAssignmentSubscription connectableSubscriptionHolder = new MultipleAssignmentSubscription(); | ||
connectedSubscription.set(connectableSubscriptionHolder); | ||
} | ||
final Token t = new Token(id); | ||
t1.add(Subscriptions.create(new Action0() { | ||
|
||
subscriber.add(Subscriptions.create(new Action0() { | ||
@Override | ||
public void call() { | ||
disconnect(t); | ||
} | ||
})); | ||
source.unsafeSubscribe(t1); | ||
connect(t); | ||
} | ||
private void connect(Token id) { | ||
List<Token> localQueue; | ||
synchronized (guard) { | ||
if (emitting) { | ||
if (queue == null) { | ||
queue = new ArrayList<Token>(); | ||
} | ||
queue.add(id); | ||
return; | ||
} | ||
|
||
localQueue = queue; | ||
queue = null; | ||
emitting = true; | ||
} | ||
boolean once = true; | ||
do { | ||
drain(localQueue); | ||
if (once) { | ||
once = false; | ||
doConnect(id); | ||
} | ||
synchronized (guard) { | ||
localQueue = queue; | ||
queue = null; | ||
if (localQueue == null) { | ||
emitting = false; | ||
return; | ||
} | ||
} | ||
} while (true); | ||
} | ||
private void disconnect(Token id) { | ||
List<Token> localQueue; | ||
synchronized (guard) { | ||
if (emitting) { | ||
if (queue == null) { | ||
queue = new ArrayList<Token>(); | ||
} | ||
queue.add(id.toDisconnect()); // negative value indicates disconnect | ||
return; | ||
} | ||
|
||
localQueue = queue; | ||
queue = null; | ||
emitting = true; | ||
} | ||
boolean once = true; | ||
do { | ||
drain(localQueue); | ||
if (once) { | ||
once = false; | ||
doDisconnect(id); | ||
} | ||
synchronized (guard) { | ||
localQueue = queue; | ||
queue = null; | ||
if (localQueue == null) { | ||
emitting = false; | ||
return; | ||
if (subscriptionCount.decrementAndGet() == 0) { | ||
// n->0 so we unsubscribe and disconnect | ||
connectedSubscription.get().unsubscribe(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not certain this will be okay when there are concurrent subscribe/unsubscribe/subscribe races. |
||
} | ||
} | ||
} while (true); | ||
} | ||
private void drain(List<Token> localQueue) { | ||
if (localQueue == null) { | ||
return; | ||
} | ||
int n = localQueue.size(); | ||
for (int i = 0; i < n; i++) { | ||
Token id = localQueue.get(i); | ||
if (id.isDisconnect()) { | ||
doDisconnect(id); | ||
} else { | ||
doConnect(id); | ||
} | ||
} | ||
} | ||
private void doConnect(Token id) { | ||
// this method is called only once per id | ||
// if add succeeds, id was not yet disconnected | ||
if (connectionStatus.put(id, OCCUPIED) == null) { | ||
if (count++ == 0) { | ||
connection = source.connect(); | ||
} | ||
} else { | ||
// connection exists due to disconnect, just remove | ||
connectionStatus.remove(id); | ||
} | ||
} | ||
private void doDisconnect(Token id) { | ||
// this method is called only once per id | ||
// if remove succeeds, id was connected | ||
if (connectionStatus.remove(id) != null) { | ||
if (--count == 0) { | ||
connection.unsubscribe(); | ||
connection = null; | ||
} | ||
} else { | ||
// mark id as if connected | ||
connectionStatus.put(id, OCCUPIED); | ||
} | ||
} | ||
/** Token that represens a connection request or a disconnection request. */ | ||
private static final class Token { | ||
final int id; | ||
public Token(int id) { | ||
this.id = id; | ||
} | ||
})); | ||
|
||
@Override | ||
public boolean equals(Object obj) { | ||
if (obj == null) { | ||
return false; | ||
} | ||
if (obj.getClass() != getClass()) { | ||
return false; | ||
// if we are incrementing from 0->1 we connect() | ||
if (count == 0) { | ||
try { | ||
source.connect(new Action1<Subscription>() { | ||
@Override | ||
public void call(final Subscription connectableSubscription) { | ||
connectedSubscription.get().set(connectableSubscription); | ||
} | ||
}); | ||
} catch (Throwable e) { | ||
// in case any error occurred we want to reset back to 0 so next attempt can retry connecting | ||
subscriptionCount.set(0); | ||
Exceptions.propagate(e); | ||
} | ||
int other = ((Token)obj).id; | ||
return id == other || -id == other; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return id < 0 ? -id : id; | ||
} | ||
public boolean isDisconnect() { | ||
return id < 0; | ||
} | ||
public Token toDisconnect() { | ||
if (id < 0) { | ||
return this; | ||
} | ||
return new Token(-id); | ||
} | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not thread-safe because 1 thread could be creating this subscription while a 2nd jumps ahead, registers its subscription then gets a null when it unsubscribes and blows up with an NPE all before the 1st thread finishes creating the MultipleAssignmentSubscription.
We either need locks or to use a State object with a single AtomicReference containing them all in a state machine.