Skip to content

fix OperatorConcat race condition where request lost #2960

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2015

Conversation

davidmoten
Copy link
Collaborator

If a request arrives while ConcatInnerSubscriber is being constructed (L199-201) then that additional request does not induce a request from the subscriber and could stall the stream. The fix is not to pass requested value in the constructor but to call ConcatInnerSubscriber.requestMore with requested only once currentSubscriber is set.

@@ -172,7 +172,8 @@ void subscribeNext() {
child.onCompleted();
} else if (o != null) {
Observable<? extends T> obs = nl.getValue(o);
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, requested);
currentSubscriber = new ConcatInnerSubscriber<T>(this, child);
currentSubscriber.requestMore(requested);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't really help much (the constructor version is behaviorally equivalent) because there is still a small window between requesting the requested and setting the current. Swapping the requestMore and set doesn't help either because then a concurrent get will request r0 + 2n: r0 + n because requested is updated and n again because the reference is non-null. You need my ProducerArbiter, however, If you can wait a few hours, I'll post a PR with all my useful producers put into rx.internal.producers since making them 'official' takes several days of back-and-forth.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like you want to fix this issue using #2963 ?

@akarnokd akarnokd added the Bug label May 19, 2015
@davidmoten
Copy link
Collaborator Author

Yeah my focus was very narrow. Similar situation to retry so 'ProducerArbiter` will be the go. Those producers sound great, thanks.

@akarnokd
Copy link
Member

The new producers have been merged into 1.x.

@akarnokd
Copy link
Member

Could you also see if #2876 passes as well?

@davidmoten davidmoten force-pushed the concat-race branch 2 times, most recently from a6a66bc to 4bdfdf8 Compare May 27, 2015 07:57
@davidmoten
Copy link
Collaborator Author

Ok, I've introduced the joys of ProducerArbiter to OperatorConcat. Tests pass and I've added the test from #2876 which doesn't fail after running for tens of minutes. There is one catch though, I changed the #2876 test to use Observable.from(Arrays.asList(t)) instead of Observable.just(t) so would be backpressure aware. Using ProducerArbiter means that sources that emit more than requested like Observable.just may induce an IllegalStateException from ProducerArbiter.

I'm supposing at some stage in the future (2.0?) we will be strict about our sources but till then perhaps we should allow a ProducerArbiter to be created in non-strict mode so that if a source emits more than requested it is forgiving about it?

@@ -795,4 +795,32 @@ public void onNext(Integer t) {
assertTrue(completed.get());
}

@Test//(timeout = 100000)
public void concatMapRangeAsyncLoopIssue2876() {
final long durationSeconds = 10;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change this to 2 seconds?

@akarnokd
Copy link
Member

No, because it would derail the state validation.

@davidmoten
Copy link
Collaborator Author

Changed the test duration to 2 seconds and added error message to ProducerArbiter so users will have a bit more of an idea what went wrong if the produced-more-than-requested IllegalStateException occcurs.

@davidmoten
Copy link
Collaborator Author

Unrelated CI failure, issue #2870 covers it.

rx.internal.operators.OperatorSerializeTest > testMultiThreadedWithNPEinMiddle FAILED
    java.lang.AssertionError
        at org.junit.Assert.fail(Assert.java:92)
        at org.junit.Assert.assertTrue(Assert.java:43)
        at org.junit.Assert.assertTrue(Assert.java:54)
        at rx.internal.operators.OperatorSerializeTest.testMultiThreadedWithNPEinMiddle(OperatorSerializeTest.java:134)

@akarnokd
Copy link
Member

Maybe it is time to make just() do backpressure properly.

Would you like to pursue a fix for testMultiThreadedWithNPEinMiddle? I think if the exception itself is created way upfront and not new'd in place, the chance of it delayed quite to the end should decrease significantly.

@davidmoten
Copy link
Collaborator Author

Sure, I can look at it.

By the way I've removed unused imports from OperatorConcatTest and also changed the onStart so that it only requests as many inner observables as it needs. Do you think this is ok?

@davidmoten
Copy link
Collaborator Author

I'd love to see just support backpressure though I assume we will take a biggish performance hit if we do.

@akarnokd
Copy link
Member

I think requesting 2 was no accident: while working on the current, the next can be 'created' asynchronously. Otherwise, once the current is processed, you need to wait more for the next Observable source to arrive.

@davidmoten
Copy link
Collaborator Author

The only catch is that by requesting 2 we make the assumption that creating this spare in advance doesn't require significant resources. If an unsubscribe occurs while the first inner is emitting then the act of creating the second is wasted (and may have been demanding).

@davidmoten
Copy link
Collaborator Author

There's a compromise either way for sure. Perhaps we could request 2 inners if Long.MAX_VALUE is requested and otherwise 1.

@akarnokd
Copy link
Member

I'd say, let's have it 2 on default and have another PR which offers an overload to concat() and concatMap() where the 'prefetch' amount can be specified.

@davidmoten
Copy link
Collaborator Author

Sounds good. We can discuss further in another PR.

@akarnokd
Copy link
Member

Looks okay to me now. Thanks!

akarnokd added a commit that referenced this pull request May 27, 2015
fix OperatorConcat race condition where request lost
@akarnokd akarnokd merged commit bb170b5 into ReactiveX:1.x May 27, 2015
@davidmoten davidmoten deleted the concat-race branch May 27, 2015 18:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants