Operator buffer(time) and buffer(time, size) now support backpressure. #2806
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Added backpressure support to
buffer(time)
andbuffer(time, size)
non-overlapping variants as requested in #2797.A request of
n | n < Long.MAX_VALUE
is converted to a request ofn * size
for the upstream. Every time the buffer's size reaches the specified amount, the timer is "paused". The remaining time for it to fire is calculated and stored so the next time the timer is started again with this remaining time, either by a downstream request 0 -> !0 transition or by theonNext
itself because downstream request is not null.Note that this start-stop behavior is of millisecond accuracy regardless of the original time unit specified because our scheduler's now() return the time in millisecond resolution.
I've also added some new assert methods and constructors to
TestSubscriber
to make state checks more convenient and support a common zero request at subscription directly (so no need to extend the class just for this anymore).