Skip to content

Fixed first emission racing with pre and post subscription. #1947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 97 additions & 6 deletions src/main/java/rx/subjects/ReplaySubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,42 @@ public void call(SubjectObserver<T> o) {
o.index(lastIndex);
}
};
ssm.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
synchronized (o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this synchronization is doing. We don't synchronize the onNext so why would synchronization here help ensure no race?

Shouldn't the caughtUp function be used to handle this use case on the first onNext to handle any data between onStart and onAdded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't. A subscriber arrives before the onNext and thus not yet visible for the caughtUp, subscriber gets registered, and not they sit doing nothing in the original version. One has to do a post-subscription check to see if new value arrived since the pre-subscription. BehaviorSubject does this as well and it is a startup window.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A subscriber arrives before the onNext and thus not yet visible for the caughtUp

That would mean it doesn't receive the onNext and it should receive that value on the next event. That sounds pretty normal for a natural race condition like this. Why wouldn't the next onNext or terminal event take care of catching up?

And what is the synchronized doing? I don't see how it is ever synchronizing between threads since onAdded would only be invoked once and we don't synchronize inside onNext.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now imagine that the Subject receives an onNext and onCompleted in quick succession; if the subscription is delayed between onStart and onAdd, the subscriber will never be notified as there can't be any further events to trigger caughtUp.

Synchronized resolves the race for the replayObserver: it protects the first indicator. If the subscribing thread gets in there first, it starts to replay existing and incoming events until it gets some "break" and the regular caughtUp can pick up. If the emitter thread gets in there first, it will behave as a regular caughtUp and the subscriber thread does nothing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so it's replayObserver and onAdded that are being synchronized. That's the connection I wasn't making. Thanks.

if (!o.first || o.emitting) {
return;
}
o.first = false;
o.emitting = true;
}
boolean skipFinal = false;
try {
for (;;) {
int idx = o.index();
int sidx = state.index;
if (idx != sidx) {
Integer j = state.replayObserverFromIndex(idx, o);
o.index(j);
}
synchronized (o) {
if (sidx == state.index) {
o.emitting = false;
skipFinal = true;
break;
}
}
}
} finally {
if (!skipFinal) {
synchronized (o) {
o.emitting = false;
}
}
}
}
};
ssm.onTerminated = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
Expand Down Expand Up @@ -264,6 +300,42 @@ static final <T> ReplaySubject<T> createWithState(final BoundedState<T> state,
Action1<SubjectObserver<T>> onStart) {
SubjectSubscriptionManager<T> ssm = new SubjectSubscriptionManager<T>();
ssm.onStart = onStart;
ssm.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
synchronized (o) {
if (!o.first || o.emitting) {
return;
}
o.first = false;
o.emitting = true;
}
boolean skipFinal = false;
try {
for (;;) {
NodeList.Node<Object> idx = o.index();
NodeList.Node<Object> sidx = state.tail();
if (idx != sidx) {
NodeList.Node<Object> j = state.replayObserverFromIndex(idx, o);
o.index(j);
}
synchronized (o) {
if (sidx == state.tail()) {
o.emitting = false;
skipFinal = true;
break;
}
}
}
} finally {
if (!skipFinal) {
synchronized (o) {
o.emitting = false;
}
}
}
}
};
ssm.onTerminated = new Action1<SubjectObserver<T>>() {

@Override
Expand Down Expand Up @@ -355,9 +427,10 @@ public boolean hasObservers() {

private boolean caughtUp(SubjectObserver<? super T> o) {
if (!o.caughtUp) {
o.caughtUp = true;
state.replayObserver(o);
o.index(null); // once caught up, no need for the index anymore
if (state.replayObserver(o)) {
o.caughtUp = true;
o.index(null); // once caught up, no need for the index anymore
}
return false;
} else {
// it was caught up so proceed the "raw route"
Expand Down Expand Up @@ -423,11 +496,20 @@ public boolean terminated() {
}

@Override
public void replayObserver(SubjectObserver<? super T> observer) {
public boolean replayObserver(SubjectObserver<? super T> observer) {

synchronized (observer) {
observer.first = false;
if (observer.emitting) {
return false;
}
}

Integer lastEmittedLink = observer.index();
if (lastEmittedLink != null) {
int l = replayObserverFromIndex(lastEmittedLink, observer);
observer.index(l);
return true;
} else {
throw new IllegalStateException("failed to find lastEmittedLink for: " + observer);
}
Expand Down Expand Up @@ -525,10 +607,18 @@ public Node<Object> tail() {
return tail;
}
@Override
public void replayObserver(SubjectObserver<? super T> observer) {
public boolean replayObserver(SubjectObserver<? super T> observer) {
synchronized (observer) {
observer.first = false;
if (observer.emitting) {
return false;
}
}

NodeList.Node<Object> lastEmittedLink = observer.index();
NodeList.Node<Object> l = replayObserverFromIndex(lastEmittedLink, observer);
observer.index(l);
return true;
}

@Override
Expand Down Expand Up @@ -571,8 +661,9 @@ interface ReplayState<T, I> {
/**
* Replay contents to the given observer.
* @param observer the receiver of events
* @return true if the subject has caught up
*/
void replayObserver(SubjectObserver<? super T> observer);
boolean replayObserver(SubjectObserver<? super T> observer);
/**
* Replay the buffered values from an index position and return a new index
* @param idx the current index position
Expand Down
75 changes: 70 additions & 5 deletions src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.junit.Test;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.*;
import org.mockito.InOrder;
import org.mockito.Mockito;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.*;
import rx.exceptions.CompositeException;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Func1;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

public class BehaviorSubjectTest {

Expand Down Expand Up @@ -417,4 +419,67 @@ public void testOnErrorThrowsDoesntPreventDelivery2() {
// even though the onError above throws we should still receive it on the other subscriber
assertEquals(1, ts.getOnErrorEvents().size());
}
@Test
public void testEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final BehaviorSubject<Object> rs = BehaviorSubject.create();

final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);

worker.schedule(new Action0() {
@Override
public void call() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.onNext(1);
}
});

final AtomicReference<Object> o = new AtomicReference<Object>();

rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new Observer<Object>() {

@Override
public void onCompleted() {
o.set(-1);
finish.countDown();
}

@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}

@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}

});
start.countDown();

if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onCompleted();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
rs.onCompleted();
}
}
}
}
69 changes: 66 additions & 3 deletions src/test/java/rx/subjects/ReplaySubjectBoundedConcurrencyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;
import org.junit.*;

import rx.Observable;
import rx.*;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Action1;
import rx.functions.*;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;

Expand Down Expand Up @@ -337,4 +337,67 @@ public void run() {
}
}
}
@Test
public void testReplaySubjectEmissionSubscriptionRace() throws Exception {
Scheduler s = Schedulers.io();
Scheduler.Worker worker = Schedulers.io().createWorker();
for (int i = 0; i < 50000; i++) {
if (i % 1000 == 0) {
System.out.println(i);
}
final ReplaySubject<Object> rs = ReplaySubject.createWithSize(2);

final CountDownLatch finish = new CountDownLatch(1);
final CountDownLatch start = new CountDownLatch(1);

worker.schedule(new Action0() {
@Override
public void call() {
try {
start.await();
} catch (Exception e1) {
e1.printStackTrace();
}
rs.onNext(1);
}
});

final AtomicReference<Object> o = new AtomicReference<Object>();

rs.subscribeOn(s).observeOn(Schedulers.io())
.subscribe(new Observer<Object>() {

@Override
public void onCompleted() {
o.set(-1);
finish.countDown();
}

@Override
public void onError(Throwable e) {
o.set(e);
finish.countDown();
}

@Override
public void onNext(Object t) {
o.set(t);
finish.countDown();
}

});
start.countDown();

if (!finish.await(5, TimeUnit.SECONDS)) {
System.out.println(o.get());
System.out.println(rs.hasObservers());
rs.onCompleted();
Assert.fail("Timeout @ " + i);
break;
} else {
Assert.assertEquals(1, o.get());
rs.onCompleted();
}
}
}
}
Loading