Skip to content

WIP: OnSubscribeRefCount Synchronous #1754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 54 additions & 166 deletions src/main/java/rx/internal/operators/OnSubscribeRefCount.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,195 +15,83 @@
*/
package rx.internal.operators;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observables.ConnectableObservable;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;

/**
* Returns an observable sequence that stays connected to the source as long
* as there is at least one subscription to the observable sequence.
* @param <T> the value type
* Returns an observable sequence that stays connected to the source as long as
* there is at least one subscription to the observable sequence.
*
* @param <T>
* the value type
*/
public final class OnSubscribeRefCount<T> implements OnSubscribe<T> {
final ConnectableObservable<? extends T> source;
final Object guard;
/** Guarded by guard. */
int index;
/** Guarded by guard. */
boolean emitting;
/** Guarded by guard. If true, indicates a connection request, false indicates a disconnect request. */
List<Token> queue;
/** Manipulated while in the serialized section. */
int count;
/** Manipulated while in the serialized section. */
Subscription connection;
/** Manipulated while in the serialized section. */
final Map<Token, Object> connectionStatus;
/** Occupied indicator. */
private static final Object OCCUPIED = new Object();

private final ConnectableObservable<? extends T> source;
private final AtomicReference<MultipleAssignmentSubscription> connectedSubscription = new AtomicReference<MultipleAssignmentSubscription>();
private final AtomicInteger subscriptionCount = new AtomicInteger(0);

/**
* Constructor.
*
* @param source
* observable to apply ref count to
*/
public OnSubscribeRefCount(ConnectableObservable<? extends T> source) {
this.source = source;
this.guard = new Object();
this.connectionStatus = new WeakHashMap<Token, Object>();
}

@Override
public void call(Subscriber<? super T> t1) {
int id;
synchronized (guard) {
id = ++index;
public void call(final Subscriber<? super T> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
// always subscribe
source.unsafeSubscribe(subscriber);

int count = subscriptionCount.getAndIncrement();
if (count == 0) {
// starting a new connection
final MultipleAssignmentSubscription connectableSubscriptionHolder = new MultipleAssignmentSubscription();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not thread-safe because 1 thread could be creating this subscription while a 2nd jumps ahead, registers its subscription then gets a null when it unsubscribes and blows up with an NPE all before the 1st thread finishes creating the MultipleAssignmentSubscription.

We either need locks or to use a State object with a single AtomicReference containing them all in a state machine.

connectedSubscription.set(connectableSubscriptionHolder);
}
final Token t = new Token(id);
t1.add(Subscriptions.create(new Action0() {

subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
disconnect(t);
}
}));
source.unsafeSubscribe(t1);
connect(t);
}
private void connect(Token id) {
List<Token> localQueue;
synchronized (guard) {
if (emitting) {
if (queue == null) {
queue = new ArrayList<Token>();
}
queue.add(id);
return;
}

localQueue = queue;
queue = null;
emitting = true;
}
boolean once = true;
do {
drain(localQueue);
if (once) {
once = false;
doConnect(id);
}
synchronized (guard) {
localQueue = queue;
queue = null;
if (localQueue == null) {
emitting = false;
return;
}
}
} while (true);
}
private void disconnect(Token id) {
List<Token> localQueue;
synchronized (guard) {
if (emitting) {
if (queue == null) {
queue = new ArrayList<Token>();
}
queue.add(id.toDisconnect()); // negative value indicates disconnect
return;
}

localQueue = queue;
queue = null;
emitting = true;
}
boolean once = true;
do {
drain(localQueue);
if (once) {
once = false;
doDisconnect(id);
}
synchronized (guard) {
localQueue = queue;
queue = null;
if (localQueue == null) {
emitting = false;
return;
if (subscriptionCount.decrementAndGet() == 0) {
// n->0 so we unsubscribe and disconnect
connectedSubscription.get().unsubscribe();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not certain this will be okay when there are concurrent subscribe/unsubscribe/subscribe races.

}
}
} while (true);
}
private void drain(List<Token> localQueue) {
if (localQueue == null) {
return;
}
int n = localQueue.size();
for (int i = 0; i < n; i++) {
Token id = localQueue.get(i);
if (id.isDisconnect()) {
doDisconnect(id);
} else {
doConnect(id);
}
}
}
private void doConnect(Token id) {
// this method is called only once per id
// if add succeeds, id was not yet disconnected
if (connectionStatus.put(id, OCCUPIED) == null) {
if (count++ == 0) {
connection = source.connect();
}
} else {
// connection exists due to disconnect, just remove
connectionStatus.remove(id);
}
}
private void doDisconnect(Token id) {
// this method is called only once per id
// if remove succeeds, id was connected
if (connectionStatus.remove(id) != null) {
if (--count == 0) {
connection.unsubscribe();
connection = null;
}
} else {
// mark id as if connected
connectionStatus.put(id, OCCUPIED);
}
}
/** Token that represens a connection request or a disconnection request. */
private static final class Token {
final int id;
public Token(int id) {
this.id = id;
}
}));

@Override
public boolean equals(Object obj) {
if (obj == null) {
return false;
}
if (obj.getClass() != getClass()) {
return false;
// if we are incrementing from 0->1 we connect()
if (count == 0) {
try {
source.connect(new Action1<Subscription>() {
@Override
public void call(final Subscription connectableSubscription) {
connectedSubscription.get().set(connectableSubscription);
}
});
} catch (Throwable e) {
// in case any error occurred we want to reset back to 0 so next attempt can retry connecting
subscriptionCount.set(0);
Exceptions.propagate(e);
}
int other = ((Token)obj).id;
return id == other || -id == other;
}

@Override
public int hashCode() {
return id < 0 ? -id : id;
}
public boolean isDisconnect() {
return id < 0;
}
public Token toDisconnect() {
if (id < 0) {
return this;
}
return new Token(-id);
}
}

}
10 changes: 8 additions & 2 deletions src/main/java/rx/internal/operators/OperatorMulticast.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ public void call() {
}));

// now that everything is hooked up let's subscribe
source.unsafeSubscribe(subscription);
// as long as the subscription is not null (which can happen if already unsubscribed)
boolean subscriptionIsNull;
synchronized(guard) {
subscriptionIsNull = subscription == null;
}
if (!subscriptionIsNull)
source.unsafeSubscribe(subscription);
}
}
}
}
Loading