Skip to content

Operator buffer(time) and buffer(time, size) now support backpressure. #2806

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

akarnokd
Copy link
Member

@akarnokd akarnokd commented Mar 5, 2015

Added backpressure support to buffer(time) and buffer(time, size) non-overlapping variants as requested in #2797.

A request of n | n < Long.MAX_VALUE is converted to a request of n * size for the upstream. Every time the buffer's size reaches the specified amount, the timer is "paused". The remaining time for it to fire is calculated and stored so the next time the timer is started again with this remaining time, either by a downstream request 0 -> !0 transition or by the onNext itself because downstream request is not null.

Note that this start-stop behavior is of millisecond accuracy regardless of the original time unit specified because our scheduler's now() return the time in millisecond resolution.

I've also added some new assert methods and constructors to TestSubscriber to make state checks more convenient and support a common zero request at subscription directly (so no need to extend the class just for this anymore).

@akarnokd akarnokd force-pushed the OperatorBufferSizeTimeBackpressure branch from 95342ad to dd3d7e0 Compare March 5, 2015 17:52
@benjchristensen
Copy link
Member

I don't think we should be changing the behavior of 'buffer(time)', since time is specifically using a temporal approach to flow control.

The 'buffer(size)' variant can definitely have backpressure though as per #2797

I'm not sure how 'buffer(time, count)' should work. One way of looking at that is that it is a temporal operator wih max size. Another is it is a size operator with timeout. If it is temporal then it shouldn't have backpressure. If it is size with timeout then backpressure makes sense.

@akarnokd
Copy link
Member Author

Because how this "stops" the timer and resumes it, the changes introduce some additional timing-related jitter. I'd say buffers and windows with time won't support backpressure at all.

@benjchristensen
Copy link
Member

Then this PR should not be pursued, correct?

We do need an answer though for the 'buffer(count, timeout)' use case. That one is common in scenarios where it wants backpressure and time is just a timeout.

It's almost like there are:

  • buffer(count, timeout)
  • buffer(time, maxSize)

They are basically the same without backpressure, but including backpressure makes them have different behaviors when backpressure occurs.

@akarnokd
Copy link
Member Author

Currently in the buffer(time, size), time is a periodic indicator to emit what's in the buffer and start a new one. What you suggest is that restart the timer whenever the buffer is full or timeout happened. I did this at first, but then one or two tests failed because they expect the periodicity mentioned above.

@benjchristensen
Copy link
Member

Can you separate out the TestSubscriber changes?

@akarnokd
Copy link
Member Author

Sure.

@akarnokd akarnokd closed this May 14, 2015
@akarnokd akarnokd deleted the OperatorBufferSizeTimeBackpressure branch September 9, 2015 15:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants