Skip to content

1.x: replay request coordination reduce overhead #3470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
277 changes: 166 additions & 111 deletions src/main/java/rx/internal/operators/OperatorReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import rx.*;
import rx.Observable;
import rx.exceptions.Exceptions;
import rx.exceptions.OnErrorThrowable;
import rx.exceptions.*;
import rx.functions.*;
import rx.internal.util.OpenHashSet;
import rx.observables.ConnectableObservable;
import rx.schedulers.Timestamped;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -303,8 +303,16 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
/** Indicates a terminated ReplaySubscriber. */
static final InnerProducer[] TERMINATED = new InnerProducer[0];

/** Tracks the subscribed producers. */
final AtomicReference<InnerProducer[]> producers;
/** Indicates no further InnerProducers are accepted. */
volatile boolean terminated;
/** Tracks the subscribed producers. Guarded by itself. */
final OpenHashSet<InnerProducer<T>> producers;
/** Contains a copy of the producers. Modified only from the source side. */
InnerProducer<T>[] producersCache;
/** Contains number of modifications to the producers set.*/
volatile long producersVersion;
/** Contains the number of modifications that the producersCache holds. */
long producersCacheVersion;
/**
* Atomically changed from false to true by connect to make sure the
* connection is only performed by one thread.
Expand All @@ -324,12 +332,19 @@ static final class ReplaySubscriber<T> extends Subscriber<T> implements Subscrip
/** The upstream producer. */
volatile Producer producer;

/** The queue that holds producers with request changes that need to be coordinated. */
List<InnerProducer<T>> coordinationQueue;
/** Indicate that all request amounts should be considered. */
boolean coordinateAll;

@SuppressWarnings("unchecked")
public ReplaySubscriber(AtomicReference<ReplaySubscriber<T>> current,
ReplayBuffer<T> buffer) {
this.buffer = buffer;

this.nl = NotificationLite.instance();
this.producers = new AtomicReference<InnerProducer[]>(EMPTY);
this.producers = new OpenHashSet<InnerProducer<T>>();
this.producersCache = EMPTY;
this.shouldConnect = new AtomicBoolean();
// make sure the source doesn't produce values until the child subscribers
// expressed their request amounts
Expand All @@ -340,7 +355,15 @@ void init() {
add(Subscriptions.create(new Action0() {
@Override
public void call() {
ReplaySubscriber.this.producers.getAndSet(TERMINATED);
if (!terminated) {
synchronized (producers) {
if (!terminated) {
producers.terminate();
producersVersion++;
terminated = true;
}
}
}
// unlike OperatorPublish, we can't null out the terminated so
// late subscribers can still get replay
// current.compareAndSet(ReplaySubscriber.this, null);
Expand All @@ -359,76 +382,34 @@ boolean add(InnerProducer<T> producer) {
if (producer == null) {
throw new NullPointerException();
}
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// get the current producer array
InnerProducer[] c = producers.get();
// if this subscriber-to-source reached a terminal state by receiving
// an onError or onCompleted, just refuse to add the new producer
if (c == TERMINATED) {
if (terminated) {
return false;
}
synchronized (producers) {
if (terminated) {
return false;
}
// we perform a copy-on-write logic
int len = c.length;
InnerProducer[] u = new InnerProducer[len + 1];
System.arraycopy(c, 0, u, 0, len);
u[len] = producer;
// try setting the producers array
if (producers.compareAndSet(c, u)) {
return true;
}
// if failed, some other operation succeeded (another add, remove or termination)
// so retry

producers.add(producer);
producersVersion++;
}
return true;
}

/**
* Atomically removes the given producer from the producers array.
* @param producer the producer to remove
*/
void remove(InnerProducer<T> producer) {
// the state can change so we do a CAS loop to achieve atomicity
for (;;) {
// let's read the current producers array
InnerProducer[] c = producers.get();
// if it is either empty or terminated, there is nothing to remove so we quit
if (c == EMPTY || c == TERMINATED) {
return;
}
// let's find the supplied producer in the array
// although this is O(n), we don't expect too many child subscribers in general
int j = -1;
int len = c.length;
for (int i = 0; i < len; i++) {
if (c[i].equals(producer)) {
j = i;
break;
}
}
// we didn't find it so just quit
if (j < 0) {
return;
}
// we do copy-on-write logic here
InnerProducer[] u;
// we don't create a new empty array if producer was the single inhabitant
// but rather reuse an empty array
if (len == 1) {
u = EMPTY;
} else {
// otherwise, create a new array one less in size
u = new InnerProducer[len - 1];
// copy elements being before the given producer
System.arraycopy(c, 0, u, 0, j);
// copy elements being after the given producer
System.arraycopy(c, j + 1, u, j, len - j - 1);
}
// try setting this new array as
if (producers.compareAndSet(c, u)) {
if (terminated) {
return;
}
synchronized (producers) {
if (terminated) {
return;
}
// if we failed, it means something else happened
// (a concurrent add/remove or termination), we need to retry
producers.remove(producer);
producersVersion++;
}
}

Expand All @@ -439,7 +420,7 @@ public void setProducer(Producer p) {
throw new IllegalStateException("Only a single producer can be set on a Subscriber.");
}
producer = p;
manageRequests();
manageRequests(null);
replay();
}

Expand Down Expand Up @@ -482,81 +463,157 @@ public void onCompleted() {
/**
* Coordinates the request amounts of various child Subscribers.
*/
void manageRequests() {
void manageRequests(InnerProducer<T> inner) {
// if the upstream has completed, no more requesting is possible
if (isUnsubscribed()) {
return;
}
synchronized (this) {
if (emitting) {
if (inner != null) {
List<InnerProducer<T>> q = coordinationQueue;
if (q == null) {
q = new ArrayList<InnerProducer<T>>();
coordinationQueue = q;
}
q.add(inner);
} else {
coordinateAll = true;
}
missed = true;
return;
}
emitting = true;
}

long ri = maxChildRequested;
long maxTotalRequested;

if (inner != null) {
maxTotalRequested = Math.max(ri, inner.totalRequested.get());
} else {
maxTotalRequested = ri;

InnerProducer<T>[] a = copyProducers();
for (InnerProducer<T> rp : a) {
if (rp != null) {
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
}
}

}
makeRequest(maxTotalRequested, ri);

for (;;) {
// if the upstream has completed, no more requesting is possible
if (isUnsubscribed()) {
return;
}

@SuppressWarnings("unchecked")
InnerProducer<T>[] a = producers.get();

long ri = maxChildRequested;
long maxTotalRequests = ri;

for (InnerProducer<T> rp : a) {
maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get());
List<InnerProducer<T>> q;
boolean all;
synchronized (this) {
if (!missed) {
emitting = false;
return;
}
missed = false;
q = coordinationQueue;
coordinationQueue = null;
all = coordinateAll;
coordinateAll = false;
}

long ur = maxUpstreamRequested;
Producer p = producer;
ri = maxChildRequested;
maxTotalRequested = ri;

long diff = maxTotalRequests - ri;
if (diff != 0) {
maxChildRequested = maxTotalRequests;
if (p != null) {
if (ur != 0L) {
maxUpstreamRequested = 0L;
p.request(ur + diff);
} else {
p.request(diff);
}
} else {
// collect upstream request amounts until there is a producer for them
long u = ur + diff;
if (u < 0) {
u = Long.MAX_VALUE;
if (q != null) {
for (InnerProducer<T> rp : q) {
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
}
}

if (all) {
InnerProducer<T>[] a = copyProducers();
for (InnerProducer<T> rp : a) {
if (rp != null) {
maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get());
}
maxUpstreamRequested = u;
}
} else
// if there were outstanding upstream requests and we have a producer
if (ur != 0L && p != null) {
maxUpstreamRequested = 0L;
// fire the accumulated requests
p.request(ur);
}

synchronized (this) {
if (!missed) {
emitting = false;
return;
makeRequest(maxTotalRequested, ri);
}
}

InnerProducer<T>[] copyProducers() {
synchronized (producers) {
Object[] a = producers.values();
int n = a.length;
@SuppressWarnings("unchecked")
InnerProducer<T>[] result = new InnerProducer[n];
System.arraycopy(a, 0, result, 0, n);
return result;
}
}

void makeRequest(long maxTotalRequests, long previousTotalRequests) {
long ur = maxUpstreamRequested;
Producer p = producer;

long diff = maxTotalRequests - previousTotalRequests;
if (diff != 0) {
maxChildRequested = maxTotalRequests;
if (p != null) {
if (ur != 0L) {
maxUpstreamRequested = 0L;
p.request(ur + diff);
} else {
p.request(diff);
}
missed = false;
} else {
// collect upstream request amounts until there is a producer for them
long u = ur + diff;
if (u < 0) {
u = Long.MAX_VALUE;
}
maxUpstreamRequested = u;
}
} else
// if there were outstanding upstream requests and we have a producer
if (ur != 0L && p != null) {
maxUpstreamRequested = 0L;
// fire the accumulated requests
p.request(ur);
}
}

/**
* Tries to replay the buffer contents to all known subscribers.
*/
@SuppressWarnings("unchecked")
void replay() {
@SuppressWarnings("unchecked")
InnerProducer<T>[] a = producers.get();
for (InnerProducer<T> rp : a) {
buffer.replay(rp);
InnerProducer<T>[] pc = producersCache;
if (producersCacheVersion != producersVersion) {
synchronized (producers) {
pc = producersCache;
// if the producers hasn't changed do nothing
// otherwise make a copy of the current set of producers
Object[] a = producers.values();
int n = a.length;
if (pc.length != n) {
pc = new InnerProducer[n];
producersCache = pc;
}
System.arraycopy(a, 0, pc, 0, n);
producersCacheVersion = producersVersion;
}
}
ReplayBuffer<T> b = buffer;
for (InnerProducer<T> rp : pc) {
if (rp != null) {
b.replay(rp);
}
}
}
}
Expand Down Expand Up @@ -635,7 +692,7 @@ public void request(long n) {
addTotalRequested(n);
// if successful, notify the parent dispatcher this child can receive more
// elements
parent.manageRequests();
parent.manageRequests(this);

parent.buffer.replay(this);
return;
Expand Down Expand Up @@ -716,7 +773,7 @@ public void unsubscribe() {
// let's assume this child had 0 requested before the unsubscription while
// the others had non-zero. By removing this 'blocking' child, the others
// are now free to receive events
parent.manageRequests();
parent.manageRequests(this);
}
}
}
Expand Down Expand Up @@ -856,8 +913,6 @@ public void replay(InnerProducer<T> output) {

/**
* Represents a node in a bounded replay buffer's linked list.
*
* @param <T> the contained value type
*/
static final class Node extends AtomicReference<Node> {
/** */
Expand Down
Loading