-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Backpressure
_**Work in progress...**_
- Introduction
For example, imagine using the [`zip`](Combining-Observables#zip) operator to zip together two infinite Observables, one of which emits items twice as frequently as the other. A naive implementation of the `zip` operator would have to maintain an ever-expanding buffer of items emitted by the faster Observable to eventually combine with items emitted by the slower one. This could cause RxJava to seize an unwieldy amount of system resources.
There are a variety of strategies with which you can exercise flow control and backpressure in RxJava in order to alleviate the problems caused when a quickly-producing Observable meets a slow-consuming observer. This page explains some of these strategies, and also shows you how you can design your own Observables and Observable operators to respect requests for flow control.
- Hot and cold Observables, and multicasted Observables
A _hot_ Observable begins generating items to emit immediately when it is created. Subscribers typically begin observing the sequence of items emitted by a hot Observable from somewhere in the middle of the sequence, beginning with the first item emitted by the Observable subsequent to the establishment of the subscription. Such an Observable emits items at its own pace, and it is up to its observers to keep up. Examples of items emitted by a hot Observable might include mouse & keyboard events, system events, or stock prices.
When a cold Observable is _multicast_ (when it is converted into a `ConnectableObservable` and its `connect()` method is called), it effectively becomes _hot_ and for the purposes of backpressure and flow-control it should be treated as a hot Observable.
Cold Observables are ideal subjects for the reactive pull model of backpressure described below. Hot observables are typically not designed to cope well with a reactive pull model, and are better candidates for some of the other flow control strategies discussed on this page, such as the use of the `onBackpressureBuffer` or `onBackpressureDrop` operators, throttling, buffers, or windows.
- Useful operators that avoid the need for backpressure
By fine-tuning the parameters to these operators you can ensure that a slow-consuming observer is not overwhelmed by a fast-producing Observable.
- Throttling
The following diagrams show how you could use each of these operators on the bursty Observable shown above.
- sample (or throttleLast)
````groovy
Observable burstySampled = bursty.sample(500, TimeUnit.MILLISECONDS);
````
- throttleFirst
````groovy
Observable burstyThrottled = bursty.throttleFirst(500, TimeUnit.MILLISECONDS);
````
- debounce (or throttleWithTimeout)
````groovy
Observable burstyDebounced = bursty.debounce(10, TimeUnit.MILLISECONDS);
````
- Buffers and windows
The following diagrams show how you could use each of these operators on the bursty Observable shown above.
- buffer
````groovy
Observable> burstyBuffered = bursty.buffer(500, TimeUnit.MILLISECONDS);
````
Or you could get fancy, and collect items in buffers during the bursty periods and emit them at the end of each burst, by using the `debounce` operator to emit a buffer closing indicator to the `buffer` operator:
````groovy
// we have to multicast the original bursty Observable so we can use it
// both as our source and as the source for our buffer closing selector:
Observable burstyMulticast = bursty.publish().refCount();
// burstyDebounced will be our buffer closing selector:
Observable burstyDebounced = burstMulticast.debounce(10, TimeUnit.MILLISECONDS);
// and this, finally, is the Observable of buffers we're interested in:
Observable> burstyBuffered = burstyMulticast.buffer(burstyDebounced);
````
- window
````groovy
Observable> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
````
You could also choose to emit a new window each time you have collected a particular number of items from the source Observable:
````groovy
Observable> burstyWindowed = bursty.window(5);
````
- Callstack blocking as a flow-control alternative to backpressure
If the Observable, all of the operators that operate on it, and the observer that is subscribed to it, are all operating in the same thread, this effectively establishes a form of backpressure by means of callstack blocking. But be aware that many Observable operators do operate in distinct threads by default (the javadocs for those operators will indicate this).
- How a subscriber establishes “reactive pull” backpressure
Then, after handling this item (or these items) in `onNext()`, you can call `request()` again to instruct the `Observable` to emit another item (or items). Here is an example of a `Subscriber` that requests one item at a time from `someObservable`:
````java someObservable.subscribe(new Subscriber<t></t>() {
@Override public void onStart() { request(1); }
@Override public void onCompleted() { // gracefully handle sequence-complete }
@Override public void onError(Throwable e) { // gracefully handle error }
@Override public void onNext(t n) { // do something with the emitted item "n" // request another item: request(1); }
}); ````
- Reactive pull backpressure isn’t magic
Let’s take a closer look at the problem of the uneven `zip`.
You have two Observables, _A_ and _B_, where _B_ is inclined to emit items more frequently than _A_. When you try to `zip` these two Observables together, the `zip` operator combines item _n_ from _A_ and item _n_ from _B_, but meanwhile _B_ has also emitted items _n_+1 to _n_+_m_. The `zip` operator has to hold on to these items so it can combine them with items _n_+1 to _n_+_m_ from _A_ as they are emitted, but meanwhile _m_ keeps growing and so the size of the buffer needed to hold on to these items keeps increasing.
You could attach a throttling operator to _B_, but this would mean ignoring some of the items _B_ emits, which might not be appropriate. What you’d really like to do is to signal to _B_ that it needs to slow down and then let _B_ decide how to do this in a way that maintains the integrity of its emissions.
The reactive pull backpressure model lets you do this. It creates a sort of active pull from the Subscriber in contrast to the normal passive push Observable behavior.
The `zip` operator as implemented in RxJava uses this technique. It maintains a small buffer of items for each source Observable, and it requests no more items from each source Observable than would fill its buffer. Each time `zip` emits an item, it removes the corresponding items from its buffers and requests exactly one more item from each of its source Observables.
(Many RxJava operators exercise reactive pull backpressure. Some operators do not need to use this variety of backpressure, as they operate in the same thread as the Observable they operate on, and so they exert a form of blocking backpressure simply by not giving the Observable the opportunity to emit another item until they have finished processing the previous one. For other operators, backpressure is inappropriate as they have been explicitly designed to deal with flow control in other ways. The RxJava javadocs for those operators that are methods of the Observable class indicate which ones do not use reactive pull backpressure and why.)
For this to work, though, Observables _A_ and _B_ must respond correctly to the `request()`. If an Observable has not been written to support reactive pull backpressure (such support is not a requirement for Observables), you can apply one of the following operators to it, each of which forces a simple form of backpressure behavior:
- onBackpressureBuffer
- maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers according to the requests they generate
an experimental version of this operator (not available in RxJava 1.0) allows you to set the capacity of the buffer; applying this operator will cause the resulting Observable to terminate with an error if this buffer is overrun - onBackpressureDrop
- drops emissions from the source Observable unless there is a pending request from a downstream Subscriber, in which case it will emit enough items to fulfill the request
- onBackpressureBlock (experimental, not in RxJava 1.0)
- blocks the thread on which the source Observable is operating until such time as a Subscriber issues a request for items, and then unblocks the thread only so long as there are pending requests
If you do not apply any of these operators to an Observable that does not support backpressure, _and_ if either you as the Subscriber or some operator between you and the Observable attempts to apply reactive pull backpressure, you will encounter a `MissingBackpressureException` which you will be notified of via your `onError()` callback.
_**Work in progress...**_
Things that may need explaining:
- the `Producer` interface (and its `request` method)
- other new method in `Subscriber`:
* `setProducer(p)`
- how and when to support producers in custom observables & operators
* point here from the "how to make a custom operator" page; maybe also from `create` operator doc
_**Work in progress...**_
- See also
- [RxJava](https://github.com/ReactiveX/RxJava/releases/tag/0.20.0-RC1)
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava