Skip to content

1.x: fix ReplaySubject anomaly around caughtUp by removing that optimization #4051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 44 additions & 55 deletions src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import rx.exceptions.Exceptions;
import rx.internal.operators.BackpressureUtils;
import rx.internal.util.RxJavaPluginUtils;
import rx.schedulers.Schedulers;

/**
* Subject that buffers all items it observes and replays them to any {@link Observer} that subscribes.
Expand Down Expand Up @@ -110,6 +111,24 @@ public static <T> ReplaySubject<T> create(int capacity) {
ReplayState<T> state = new ReplayState<T>(buffer);
return new ReplaySubject<T>(state);
}
/**
* Creates an unbounded replay subject with the time-bounded-implementation for testing purposes.
* <p>
* This variant behaves like the regular unbounded {@code ReplaySubject} created via {@link #create()} but
* uses the structures of the bounded-implementation. This is by no means intended for the replacement of
* the original, array-backed and unbounded {@code ReplaySubject} due to the additional overhead of the
* linked-list based internal buffer. The sole purpose is to allow testing and reasoning about the behavior
* of the bounded implementations without the interference of the eviction policies.
*
* @param <T>
* the type of items observed and emitted by the Subject
* @return the created subject
*/
/* public */ static <T> ReplaySubject<T> createUnboundedTime() {
ReplayBuffer<T> buffer = new ReplaySizeAndTimeBoundBuffer<T>(Integer.MAX_VALUE, Long.MAX_VALUE, Schedulers.immediate());
ReplayState<T> state = new ReplayState<T>(buffer);
return new ReplaySubject<T>(state);
}
/**
* Creates a size-bounded replay subject.
* <p>
Expand Down Expand Up @@ -431,14 +450,7 @@ public void onNext(T t) {

b.next(t);
for (ReplayProducer<T> rp : get()) {
if (rp.caughtUp) {
rp.actual.onNext(t);
} else {
if (b.drain(rp)) {
rp.caughtUp = true;
rp.node = null;
}
}
b.drain(rp);
}
}

Expand All @@ -451,14 +463,7 @@ public void onError(Throwable e) {
List<Throwable> errors = null;
for (ReplayProducer<T> rp : getAndSet(TERMINATED)) {
try {
if (rp.caughtUp) {
rp.actual.onError(e);
} else {
if (b.drain(rp)) {
rp.caughtUp = true;
rp.node = null;
}
}
b.drain(rp);
} catch (Throwable ex) {
if (errors == null) {
errors = new ArrayList<Throwable>();
Expand All @@ -477,14 +482,7 @@ public void onCompleted() {

b.complete();
for (ReplayProducer<T> rp : getAndSet(TERMINATED)) {
if (rp.caughtUp) {
rp.actual.onCompleted();
} else {
if (b.drain(rp)) {
rp.caughtUp = true;
rp.node = null;
}
}
b.drain(rp);
}
}

Expand All @@ -508,7 +506,7 @@ interface ReplayBuffer<T> {

void complete();

boolean drain(ReplayProducer<T> rp);
void drain(ReplayProducer<T> rp);

boolean isComplete();

Expand Down Expand Up @@ -585,9 +583,9 @@ public void complete() {
}

@Override
public boolean drain(ReplayProducer<T> rp) {
public void drain(ReplayProducer<T> rp) {
if (rp.getAndIncrement() != 0) {
return false;
return;
}

int missed = 1;
Expand All @@ -610,7 +608,7 @@ public boolean drain(ReplayProducer<T> rp) {
while (e != r) {
if (a.isUnsubscribed()) {
rp.node = null;
return false;
return;
}

boolean d = done;
Expand All @@ -624,7 +622,7 @@ public boolean drain(ReplayProducer<T> rp) {
} else {
a.onCompleted();
}
return false;
return;
}

if (empty) {
Expand All @@ -649,7 +647,7 @@ public boolean drain(ReplayProducer<T> rp) {
if (e == r) {
if (a.isUnsubscribed()) {
rp.node = null;
return false;
return;
}

boolean d = done;
Expand All @@ -663,7 +661,7 @@ public boolean drain(ReplayProducer<T> rp) {
} else {
a.onCompleted();
}
return false;
return;
}
}

Expand All @@ -679,7 +677,7 @@ public boolean drain(ReplayProducer<T> rp) {

missed = rp.addAndGet(-missed);
if (missed == 0) {
return r == Long.MAX_VALUE;
return;
}
}
}
Expand Down Expand Up @@ -799,9 +797,9 @@ public void complete() {
}

@Override
public boolean drain(ReplayProducer<T> rp) {
public void drain(ReplayProducer<T> rp) {
if (rp.getAndIncrement() != 0) {
return false;
return;
}

final Subscriber<? super T> a = rp.actual;
Expand All @@ -822,7 +820,7 @@ public boolean drain(ReplayProducer<T> rp) {
while (e != r) {
if (a.isUnsubscribed()) {
rp.node = null;
return false;
return;
}

boolean d = done;
Expand All @@ -837,7 +835,7 @@ public boolean drain(ReplayProducer<T> rp) {
} else {
a.onCompleted();
}
return false;
return;
}

if (empty) {
Expand All @@ -853,7 +851,7 @@ public boolean drain(ReplayProducer<T> rp) {
if (e == r) {
if (a.isUnsubscribed()) {
rp.node = null;
return false;
return;
}

boolean d = done;
Expand All @@ -867,7 +865,7 @@ public boolean drain(ReplayProducer<T> rp) {
} else {
a.onCompleted();
}
return false;
return;
}
}

Expand All @@ -881,7 +879,7 @@ public boolean drain(ReplayProducer<T> rp) {

missed = rp.addAndGet(-missed);
if (missed == 0) {
return r == Long.MAX_VALUE;
return;
}
}
}
Expand Down Expand Up @@ -1051,9 +1049,9 @@ TimedNode<T> latestHead() {
}

@Override
public boolean drain(ReplayProducer<T> rp) {
public void drain(ReplayProducer<T> rp) {
if (rp.getAndIncrement() != 0) {
return false;
return;
}

final Subscriber<? super T> a = rp.actual;
Expand All @@ -1074,7 +1072,7 @@ public boolean drain(ReplayProducer<T> rp) {
while (e != r) {
if (a.isUnsubscribed()) {
rp.node = null;
return false;
return;
}

boolean d = done;
Expand All @@ -1089,7 +1087,7 @@ public boolean drain(ReplayProducer<T> rp) {
} else {
a.onCompleted();
}
return false;
return;
}

if (empty) {
Expand All @@ -1105,7 +1103,7 @@ public boolean drain(ReplayProducer<T> rp) {
if (e == r) {
if (a.isUnsubscribed()) {
rp.node = null;
return false;
return;
}

boolean d = done;
Expand All @@ -1119,7 +1117,7 @@ public boolean drain(ReplayProducer<T> rp) {
} else {
a.onCompleted();
}
return false;
return;
}
}

Expand All @@ -1133,7 +1131,7 @@ public boolean drain(ReplayProducer<T> rp) {

missed = rp.addAndGet(-missed);
if (missed == 0) {
return r == Long.MAX_VALUE;
return;
}
}
}
Expand Down Expand Up @@ -1226,14 +1224,6 @@ static final class ReplayProducer<T>
/** Holds the back-reference to the replay state object. */
final ReplayState<T> state;

/**
* Indicates this Subscriber runs unbounded and the <b>source</b>-triggered
* buffer.drain() has emitted all available values.
* <p>
* This field has to be read and written from the source emitter's thread only.
*/
boolean caughtUp;

/**
* Unbounded buffer.drain() uses this field to remember the absolute index of
* values replayed to this Subscriber.
Expand Down Expand Up @@ -1276,6 +1266,5 @@ public void request(long n) {
throw new IllegalArgumentException("n >= required but it was " + n);
}
}

}
}
Loading