-
Notifications
You must be signed in to change notification settings - Fork 7.6k
fix OperatorConcat race condition where request lost #2960
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -172,7 +172,8 @@ void subscribeNext() { | |||
child.onCompleted(); | |||
} else if (o != null) { | |||
Observable<? extends T> obs = nl.getValue(o); | |||
currentSubscriber = new ConcatInnerSubscriber<T>(this, child, requested); | |||
currentSubscriber = new ConcatInnerSubscriber<T>(this, child); | |||
currentSubscriber.requestMore(requested); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't really help much (the constructor version is behaviorally equivalent) because there is still a small window between requesting the requested
and setting the current
. Swapping the requestMore
and set
doesn't help either because then a concurrent get will request r0 + 2n: r0 + n because requested is updated and n again because the reference is non-null. You need my ProducerArbiter
, however, If you can wait a few hours, I'll post a PR with all my useful producers put into rx.internal.producers
since making them 'official' takes several days of back-and-forth.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like you want to fix this issue using #2963 ?
Yeah my focus was very narrow. Similar situation to retry so 'ProducerArbiter` will be the go. Those producers sound great, thanks. |
The new producers have been merged into 1.x. |
Could you also see if #2876 passes as well? |
a6a66bc
to
4bdfdf8
Compare
Ok, I've introduced the joys of I'm supposing at some stage in the future (2.0?) we will be strict about our sources but till then perhaps we should allow a |
@@ -795,4 +795,32 @@ public void onNext(Integer t) { | |||
assertTrue(completed.get()); | |||
} | |||
|
|||
@Test//(timeout = 100000) | |||
public void concatMapRangeAsyncLoopIssue2876() { | |||
final long durationSeconds = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you change this to 2 seconds?
No, because it would derail the state validation. |
Changed the test duration to 2 seconds and added error message to |
Unrelated CI failure, issue #2870 covers it.
|
Maybe it is time to make just() do backpressure properly. Would you like to pursue a fix for |
Sure, I can look at it. By the way I've removed unused imports from |
I'd love to see |
I think requesting 2 was no accident: while working on the current, the next can be 'created' asynchronously. Otherwise, once the current is processed, you need to wait more for the next Observable source to arrive. |
The only catch is that by requesting 2 we make the assumption that creating this spare in advance doesn't require significant resources. If an unsubscribe occurs while the first inner is emitting then the act of creating the second is wasted (and may have been demanding). |
There's a compromise either way for sure. Perhaps we could request 2 inners if |
I'd say, let's have it 2 on default and have another PR which offers an overload to |
Sounds good. We can discuss further in another PR. |
Looks okay to me now. Thanks! |
fix OperatorConcat race condition where request lost
If a request arrives while
ConcatInnerSubscriber
is being constructed (L199-201) then that additional request does not induce a request from the subscriber and could stall the stream. The fix is not to passrequested
value in the constructor but to callConcatInnerSubscriber.requestMore
withrequested
only oncecurrentSubscriber
is set.