Skip to content

Avoid merge operator from over requesting upstream producer. #2765

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from

Conversation

dpsm
Copy link
Contributor

@dpsm dpsm commented Feb 21, 2015

The merge operator requests from upstream disregarding the amount
of requested items from downstream. This causes an issue where
it creates a produce/consume cycle that is infinite and puts
pressure on producer.

The operator requests from upstream disregarding the ammount
of requested items from downstream. This causes an issue where
it creates a produce/consume cycle that is infinite and puts
pressure on producer.
@dpsm
Copy link
Contributor Author

dpsm commented Feb 21, 2015

This fixes #2766

@akarnokd
Copy link
Member

I don't understand the problem you are trying to solve. Could you explain it?

@dpsm
Copy link
Contributor Author

dpsm commented Feb 22, 2015

The merge operator requests from the upstream producer regardless of
downstream requests. When the observable is infinite the merge operator
buffers all items emitted from upstream getting into an infinite loop and
doesn't allow items to go downstream even when downstream requests have
been made.

If you run the added test with the original code it won't deliver anything
to the subscriber and should illustrate the problem.
On Feb 22, 2015 5:20 PM, "David Karnok" [email protected] wrote:

I don't understand the problem you are trying to solve. Could you explain
it?


Reply to this email directly or view it on GitHub
#2765 (comment).

@@ -213,7 +213,7 @@ private void handleNewSource(Observable<? extends T> t) {
InnerSubscriber<T> i = new InnerSubscriber<T>(this, producerIfNeeded);
i.sindex = childrenSubscribers.add(i);
t.unsafeSubscribe(i);
if (!isUnsubscribed()) {
if ((producerIfNeeded == null || producerIfNeeded.requested > 0) && !isUnsubscribed()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@akarnokd the original code calls request(1) regardless of the "requested" variable causing the operator to potentially buffer a dangerous amount of items and also a loop between producer.request() -> consumer.onNext() -> producer->request() ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merge deliberately doesn't limit the source observable of observables; this is what MergeMaxConcurrent is for. Besides, I can't rule out a race for requested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dpsm
Copy link
Contributor Author

dpsm commented Feb 27, 2015

@benjchristensen thoughts?

@dpsm dpsm mentioned this pull request Feb 28, 2015
@benjchristensen
Copy link
Member

This one is tricky ... @akarnokd is correct when he says this:

Merge deliberately doesn't limit the source observable of observables; this is what MergeMaxConcurrent is for

We can't arbitrarily restrict subscribing to upstream Observables as we could deadlock the system, but it does seem to make sense to not request more upstream if the downstream is sitting at 0.

If requested > 0 we always have to subscribe upstream though, so we would only fix a specific case, not the more general issue of horizontal buffer bloat. If the consumer is slow and a very fast Observable<Observable<T>> keeps emitting inner Observables then merge will keep subscribing to them as soon as requested goes above 1, and then fill their buffers.

Thus I don't actually see this solving the problem, perhaps just reducing it or making it less likely.

Whatever the buffer size is per InnerSubscriber will always be possible of being filled.

Here's the example:

  • 10 Observables emitted and subscribed to, each fills a buffer of 128
  • requested drains and hits 0
  • 1000 new Observables are emitted, but none subscribed to while requested is at 0
  • request(128) happens
  • all 1000 Observables must be subscribed to in addition to the original 10 otherwise either deadlocking can occur (since it's one of the 1000 that has data and the original 10 are now quiet) or a completely unfair system where only the original 10 will ever emit.
  • requested quickly hits 0 again
  • we now have 1010 Observables with 128 items buffered in each
  • the process repeats itself with millions of Observables

This is a downward spiral that will occur even if we don't request upstream when requested == 0 and is why we have merge(o, maxConcurrent) and recently added flatMap(f, maxConcurrent) as an overload.

Here is an example showing this in action: https://gist.github.com/benjchristensen/a0350776a595fd6e3810#file-parallelexecution-java-L78

    private static void flatMapBufferedExampleAsync() {
        final AtomicInteger total = new AtomicInteger();
        Observable.range(0, 500000000)
                .doOnNext(i -> total.incrementAndGet())
                .buffer(100)
                .doOnNext(i -> System.out.println("emit " + i))
                .flatMap(i -> {
                    return Observable.from(i).subscribeOn(Schedulers.computation()).map(item -> {
                        // simulate computational work
                            try {
                                Thread.sleep(10);
                            } catch (Exception e) {
                            }
                            return item + " processed " + Thread.currentThread();
                        });
                }, Runtime.getRuntime().availableProcessors()).toBlocking().forEach(System.out::println);

        System.out.println("total emitted: " + total.get());
    }

@dpsm
Copy link
Contributor Author

dpsm commented Feb 28, 2015

@benjchristensen I see your point and it makes sense. My particular issue which led me to this patch is because I had an infinite observable lifted with flatmap and within flatmap the observable was sync hence really making the concurrent overload irrelevant in this case.

This causes a problem where given there are no downstream requests, the SourceSubscriber.List being unbound now has an ever growing number of MergeItemSubscriber instances respectively with a small number of items within their MergeItemSubscriber.RxRingBuffer.

What direction should we take to tackle this?

@benjchristensen
Copy link
Member

making the concurrent overload irrelevant in this case

It is relevant in limiting the horizontal growth :-)

I'm trying to think through this scenario and it seems the same degenerate case can still happen, just more slowly if we fix it to not subscribe when requested == 0.

For example:

  • emit 10 Observables that each emit 5 items and do NOT complete
  • subscribe and emit 50 items
  • slow consumer does not request more so we have 128-50=78
  • 10 subscribers in SourceSubscriber.List
  • receive 50 more Observables that emit 10 items each
  • 500 onNext and 78 can be emitted so we have 422 in queues
  • 510 subscribers in SourceSubscriber.List and 422 items

It is at this point where we decided what to do ...

  • 1000 new Observables are to be given us ... but requested == 0
  • If there are items in buffer (the 422) we should say "no" and drain those first
  • Once the 422 are drained, we would still have 510 Observables in the list if the are infinite children Observables. If they are finite then they would clean out okay.
  • Then it would request more upstream.

Does it request 1 at a time? Doesn't that result in starvation? How does this logic work with async?

NOTE: I'm dumping thoughts here to continue the conversation and show I'm thinking about it. It's not yet clear to me what the implications of this type of change are.

@benjchristensen
Copy link
Member

@akarnokd Considering the nuance of this and the use cases we've discussed, what is your take? Can we improve this edge case safely without negatively impacting other things? Is the degenerate case I refer to only applicable to async or also sync?

@akarnokd
Copy link
Member

akarnokd commented Mar 4, 2015

Sorry, I had no time to think about this problem in depth. If no hurries, can I come back to this issue tomorrow?

@benjchristensen
Copy link
Member

Yes, definitely. When you get a chance. This is not an easy one.

@akarnokd
Copy link
Member

akarnokd commented Mar 5, 2015

The main benefit of merge is that it starts the inner Observables immediately allowing them to perform their async work and have it ready for consumption. This PR essentially puts a limit on how many sources can be active (RxRingBuffer.SIZE) and slowly increases this limit on each downstream request as @benjchristensen pointed out. I don't think any adaptive scheme would work with merge. You need mergeMaxConcurrent instead in your scenario and you can determine the simultaneous connected observable count, so in conclusion, I don't support this PR's changes.

@akarnokd
Copy link
Member

akarnokd commented May 6, 2015

I'm closing this in favor for #2928.

@akarnokd akarnokd closed this May 6, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants