Skip to content

Backpressure

David Gross edited this page Aug 1, 2014 · 40 revisions

Work in progress...

In RxJava it is not difficult to get into a situation in which an Observable is emitting items more rapidly than an operator or subscriber can consume them. This presents the problem of what to do with such a growing backlog of unconsumed items.

For example, imagine using the zip operator to zip together two infinite Observables, one of which emits items twice as frequently as the other. The zip operator, to perform as advertised, will have to maintain an ever-expanding buffer of items emitted by the faster Observable to eventually combine with items emitted by the slower one. This could cause RxJava to seize an unwieldy amount of system resources.

There are a variety of strategies with which you can exercise flow control and backpressure in RxJava. This page explains some of these strategies, and also shows you how you can design your own Observables and Observable operators to respect requests for flow control.

Useful operators that avoid the need for backpressure

Your first line of defense against the problems of over-producing Observables is the ordinary set of Observable operators.

Throttling

Operators like sample( ) or throttleLast( ), throttleFirst( ), and throttleWithTimeout( ) or debounce( ) allow you to regulate the rate at which an Observable emits items.

We might, for example, have used one of these operators on each of the two Observables we intended to zip together in the conundrum mentioned earlier, and this would have solved our problem. But the behavior of the resulting zip would also have been different. It would no longer necessarily zip together the nth item from each Observable sequentially.

Buffers and windows

You could also use an operator like buffer( ) or window( ) to collect items from the over-producing Observable and then emit them, less-frequently, as collections (or Observables) of items. The slow consumer can then decide whether to process only one particular item from each collection, to process some combination of those items, or to schedule work to be done on each item in the collection, as appropriate.

Callstack blocking as a flow-control alternative to backpressure

Another way of handling an overproductive Observable is to block the callstack (parking the thread that governs the overproductive Observable). This has the disadvantage of going against the “reactive” and non-blocking model of Rx. However this can be a viable option if the problematic Observable is on a thread that can be blocked safely. Currently RxJava does not expose any operators to facilitate this.

Backpressure isn’t magic

Backpressure doesn’t make the problem of an overproducing Observable or an underconsuming Subscriber go away. It just moves the problem up the chain of operators to a point where it can be handled better.

Let’s take a closer look at the problem of the uneven zip.

You have two Observables, A and B, where B is inclined to emit items more frequently as A. When you try to zip these two Observables together, the zip operator combines item n from A and item n from B, but meanwhile B has also emitted items n+1 to n+m. The zip operator has to hold on to these items so it can combine them with items n+1 to n+m from A as they are emitted, but meanwhile m keeps growing and so the size of the buffer needed to hold on to these items keeps increasing.

You could attach a throttling operator to B, but this would mean ignoring some of the items B emits, which might not be appropriate. What you’d really like to do is to signal to B that it needs to slow down and then let B decide how to do this in a way that maintains the integrity of its emissions.

The reactive pull backpressure model lets you do this. The Subscriber interface has a method called request(_n_) that lets it ask for a specified number of items from the Observable the Subscriber is subscribed to. A Subscriber can call this method inside its onStart() handler to initiate the emission of items, and in its onNext() handler to keep the flow of emissions coming. This creates a sort of active pull from the Subscriber in contrast to the normal passive push Observable behavior.

The zip operator in RxJava uses this technique. It maintains a small buffer of items, and requests no more from its source Observables than would fill that buffer. Each time zip emits an item, it removes that item from its buffer and requests exactly one more item from each of its source Observables.

(Most RxJava operators exercise this variety of backpressure. Some operators do not need to use this variety of backpressure, as they operate in the same thread as the Observable they operate on, and so they exert a form of blocking backpressure simply by not giving the Observable the opportunity to emit another item until they have finished processing the previous one. For other operators, backpressure is inappropriate as they have been explicitly designed to deal with flow control in other ways. The RxJava javadocs for those operators that are methods of the Observable class indicate which ones do not use standard backpressure and why.)

For this to work, though, A and B (or the Observables that result from operators applied to them) must respond correctly to the request(). If an Observable has not been written to support backpressure (such support is not a requirement for Observables), you can apply one of the following operators to it, each of which forces a simple form of backpressure behavior:

onBackpressureBuffer
maintains a buffer of all emissions from the source Observable and emits them to downstream Subscribers according to the requests they generate
onBackpressureDrop
drops emissions from the source Observable unless there is a pending request from a downstream Subscriber, in which case it will emit enough items to fulfill the request

If you do not apply either of these operators to an Observable that does not support backpressure, and if either you as the Subscriber or some operator between you and the Observable attempts to apply backpressure, you will encounter a MissingBackpressureException which you will be notified of via your onError() callback.

How to request backpressure from a subscriber

When you subscribe to an Observable with a Subscriber, you can request backpressure by calling Subscriber.request(n) in the Subscriber’s onStart() method, where n is the maximum number of items you want the Observable to emit before the next request() call.

Then, after handling this item (or these items) in onNext(), you can call request() again to instruct the Observable to emit another item (or items). Here is an example of a Subscriber that requests one item at a time from someObservable:

someObservable.subscribe(new Subscriber<t>() {
    @Override
    public void onStart() {
      request(1);
    }

    @Override
    public void onCompleted() {
      // gracefully handle sequence-complete
    }

    @Override
    public void onError(Throwable e) {
      // gracefully handle error
    }

    @Override
    public void onNext(t n) {
      // do something with the emitted item "n"
      // request another item:
      request(1);
    }
});

Work in progress...

Things that may need explaining:

  • the Producer interface (and its request method)
  • other new method in Subscriber:
    • setProducer(p)
  • how and when to support producers in custom observables & operators
    • point here from the "how to make a custom operator" page; maybe also from create operator doc

Work in progress...

See also

Clone this wiki locally