Skip to content

Fixed first emission racing with pre and post subscription. #1947

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Dec 9, 2014

There was a subtle race between the subscription and emission which delayed the delivery of the first emission if it happened between the pre- and post-subscription of a subscriber. The fix is the same logic used by the BehaviorSubject to avoid the same problem.

ssm.onAdded = new Action1<SubjectObserver<T>>() {
@Override
public void call(SubjectObserver<T> o) {
synchronized (o) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what this synchronization is doing. We don't synchronize the onNext so why would synchronization here help ensure no race?

Shouldn't the caughtUp function be used to handle this use case on the first onNext to handle any data between onStart and onAdded?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can't. A subscriber arrives before the onNext and thus not yet visible for the caughtUp, subscriber gets registered, and not they sit doing nothing in the original version. One has to do a post-subscription check to see if new value arrived since the pre-subscription. BehaviorSubject does this as well and it is a startup window.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A subscriber arrives before the onNext and thus not yet visible for the caughtUp

That would mean it doesn't receive the onNext and it should receive that value on the next event. That sounds pretty normal for a natural race condition like this. Why wouldn't the next onNext or terminal event take care of catching up?

And what is the synchronized doing? I don't see how it is ever synchronizing between threads since onAdded would only be invoked once and we don't synchronize inside onNext.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now imagine that the Subject receives an onNext and onCompleted in quick succession; if the subscription is delayed between onStart and onAdd, the subscriber will never be notified as there can't be any further events to trigger caughtUp.

Synchronized resolves the race for the replayObserver: it protects the first indicator. If the subscribing thread gets in there first, it starts to replay existing and incoming events until it gets some "break" and the regular caughtUp can pick up. If the emitter thread gets in there first, it will behave as a regular caughtUp and the subscriber thread does nothing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so it's replayObserver and onAdded that are being synchronized. That's the connection I wasn't making. Thanks.

benjchristensen added a commit that referenced this pull request Dec 12, 2014
Fixed first emission racing with pre and post subscription.
@benjchristensen benjchristensen merged commit f934706 into ReactiveX:1.x Dec 12, 2014
@akarnokd akarnokd deleted the ReplaySubjectFirstEmissionFix branch December 12, 2014 07:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants