-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Avoid merge operator from over requesting upstream producer. #2765
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The operator requests from upstream disregarding the ammount of requested items from downstream. This causes an issue where it creates a produce/consume cycle that is infinite and puts pressure on producer.
This fixes #2766 |
I don't understand the problem you are trying to solve. Could you explain it? |
The merge operator requests from the upstream producer regardless of If you run the added test with the original code it won't deliver anything
|
@@ -213,7 +213,7 @@ private void handleNewSource(Observable<? extends T> t) { | |||
InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded); | |||
i.sindex = childrenSubscribers.add(i); | |||
t.unsafeSubscribe(i); | |||
if (!isUnsubscribed()) { | |||
if ((producerIfNeeded == null || producerIfNeeded.requested > 0) && !isUnsubscribed()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@akarnokd the original code calls request(1) regardless of the "requested" variable causing the operator to potentially buffer a dangerous amount of items and also a loop between producer.request() -> consumer.onNext() -> producer->request() ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Merge deliberately doesn't limit the source observable of observables; this is what MergeMaxConcurrent
is for. Besides, I can't rule out a race for requested
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc @benjchristensen
@benjchristensen thoughts? |
This one is tricky ... @akarnokd is correct when he says this:
We can't arbitrarily restrict subscribing to upstream If Thus I don't actually see this solving the problem, perhaps just reducing it or making it less likely. Whatever the buffer size is per Here's the example:
This is a downward spiral that will occur even if we don't request upstream when Here is an example showing this in action: https://gist.github.com/benjchristensen/a0350776a595fd6e3810#file-parallelexecution-java-L78 private static void flatMapBufferedExampleAsync() {
final AtomicInteger total = new AtomicInteger();
Observable.range(0, 500000000)
.doOnNext(i -> total.incrementAndGet())
.buffer(100)
.doOnNext(i -> System.out.println("emit " + i))
.flatMap(i -> {
return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
// simulate computational work
try {
Thread.sleep(10);
} catch (Exception e) {
}
return item + " processed " + Thread.currentThread();
});
}, Runtime.getRuntime().availableProcessors()).toBlocking().forEach(System.out::println);
System.out.println("total emitted: " + total.get());
} |
@benjchristensen I see your point and it makes sense. My particular issue which led me to this patch is because I had an infinite observable lifted with flatmap and within flatmap the observable was sync hence really making the concurrent overload irrelevant in this case. This causes a problem where given there are no downstream requests, the SourceSubscriber.List being unbound now has an ever growing number of MergeItemSubscriber instances respectively with a small number of items within their MergeItemSubscriber.RxRingBuffer. What direction should we take to tackle this? |
It is relevant in limiting the horizontal growth :-) I'm trying to think through this scenario and it seems the same degenerate case can still happen, just more slowly if we fix it to not subscribe when For example:
It is at this point where we decided what to do ...
Does it request 1 at a time? Doesn't that result in starvation? How does this logic work with async? NOTE: I'm dumping thoughts here to continue the conversation and show I'm thinking about it. It's not yet clear to me what the implications of this type of change are. |
@akarnokd Considering the nuance of this and the use cases we've discussed, what is your take? Can we improve this edge case safely without negatively impacting other things? Is the degenerate case I refer to only applicable to async or also sync? |
Sorry, I had no time to think about this problem in depth. If no hurries, can I come back to this issue tomorrow? |
Yes, definitely. When you get a chance. This is not an easy one. |
The main benefit of merge is that it starts the inner Observables immediately allowing them to perform their async work and have it ready for consumption. This PR essentially puts a limit on how many sources can be active (RxRingBuffer.SIZE) and slowly increases this limit on each downstream request as @benjchristensen pointed out. I don't think any adaptive scheme would work with merge. You need mergeMaxConcurrent instead in your scenario and you can determine the simultaneous connected observable count, so in conclusion, I don't support this PR's changes. |
I'm closing this in favor for #2928. |
The merge operator requests from upstream disregarding the amount
of requested items from downstream. This causes an issue where
it creates a produce/consume cycle that is infinite and puts
pressure on producer.