Skip to content

2.x: Design.md +extension +fusion #3980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 17, 2016
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 242 additions & 8 deletions DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,18 +325,158 @@ In the addition of the previous rules, an operator for `Flowable`:

### Creation

Creation of the various types should be exposed through factory methods that provide safe construction.
Unlike RxJava 1.x, 2.x base classes are to be abstract, stateless and generally no longer wrap an `OnSubscribe` callback - this saves allocation in assembly time without limiting the expressiveness. Operator methods and standard factories still live as final on the base classes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain more about the rationale behind having abstract base classes that are stateless?
Is that an optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It uses less memory.


Instead of the indirection of an `OnSubscribe` and `lift`, operators are to be implemented by extending the base classes. For example, the `map`
Copy link

@stealthcode stealthcode Jun 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see where you are going with this change. I don't think this has been discussed before. If I'm understanding correctly this is a departure from the Operator interface as an adapter between subscribers and instead just giving each operator method the full context of the previous observable.

I would rather find an alternative to class inheritance. What are the advantages of this approach? Also, why is FlowableMap declared final?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If Java had extension methods, you didn't have to do this inheritance. FlowableMap is final to prevent unintended extension.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will not work with Consumable because each operator is not reusable for every implementation of a Consumable for a given observer/subscriber type. I do not agree that this should be the way to provide extensions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cross base-type operator reuse can't work. Extending the base types with new methods via inheritance doesn't work. It requires returning <? extends BaseType> and the compiler can't properly chain calls with them without target-typing each stage.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am understanding your terminology correctly, you are saying that a "cross base-type" operator is an operator which functions for both an Observable and a Flowable for instance. Am I correct? I'm suggesting that an operator which knows how to negotiate backpressure from a Subscriber should be capable of supporting any kind of Consumable that functions using Subscriber. This means that there can be many types which implement Consumable<Observer<T>> and any operator that knows how to work with an Observer can be reused.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Flowable base class type we have the rs.Publisher as the base interface type. In principle, you could define base interfaces for Observable, Single and Completable, but I don't see why unless you want to encourage other libraries to implement them differently than in RxJava.

For Flowable, the Operator<T, R> == Function<Subscriber<R>, Subscriber<T>>; the OnSubscribe<T> == Consumer<Subscriber<T>> ~ rs.Publisher<T>.

The plan is that standard operator implementations, such as FlowableMap may take an rs.Publisher<T> at least (and Flowable<T> at most), but you won't be able to combine FlowableMap and ObservableMap because the actual output class differs and needs different calls to onSubscribe.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you won't be able to combine FlowableMap and ObservableMap because the actual output class differs and needs different calls to onSubscribe.

@akarnokd I am telling you that is not what I am saying! Read my comments carefully! I am saying that a Map operator can be written that works over all Flowables AND a separate map operator can be written that works over all Observables! I really don't know why you don't read my comments.

but I don't see why unless you want to encourage other libraries to implement them differently than in RxJava.

We talked about this when you tried to implement Consumable. I thought you were on board with creating a base interface or class for all kinds of Observables (and likewise a different base for Completables and Singles). Is there something I'm missing?

The plan

Who's plan? I know that I'm pretty confused about the perceived benefits here. It seems like a micro optimization to save an allocation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The drawback of arguing in a design document is that there is no proof that things actually work out properly and not just locally. If we'd work from code PRs instead, we'd be sure the concepts actually compile and how far their effects go.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is why I showed that it can be done in a proof of concept project. That you reviewed.

operator will look like this:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevegury Was the SyncGenerator wording supposed to be a rename of the SyncOnSubscribe?


```java
Flowable.create(SyncGenerator generator)
public final class FlowableMap<T, R> extends Flowable<R> {

final Flowable<? extends T> source;

final Function<? super T, ? extends R> mapper;

public FlowableMap(Flowable<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}

@Override
protected void subscribeActual(Subscriber<? super R> subscriber) {
source.subscribe(new FlowableMapSubscriber<T, R>(subscriber, mapper));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be source.subscribeActual?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would bypass the plugin hook in source.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like @stealthcode, I was under the impression that we had decided to go with the Consumable interface.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you implement it, then go with Consumable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not how collaborating on a team works @akarnokd.

}

static final class FlowableMapSubscriber<T, R> implements Subscriber<T>, Subscription {
// ...
}
}
```

Since Java still doesn't have extension methods, "adding" more operators can only happen through helper methods such as `lift(C -> C)` and `compose(R -> P)` where `C` is the default consumer type (i.e., `rs.Subscriber`), `R` is the base type (i.e., `Flowable`) and `P` is the base interface (i.e., `rs.Publisher`). As before, the library itself may gain or lose standard operators and/or overloads through the same community process.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"adding" more operators can only happen through

This version of compose seems to be very similar to the proposed extend method on Consumable as suggested in my prototype, as merged on the Observable.extend and as we discussed in prior pull requests. Is the intent with the compose overload to replace the extend method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compose() is different from extend because it gives the full base type to the function. Substituting R with Flowable the signature looks like this:

public final <U> Flowable<U> compose(Function<? super Flowable<T>, ? extends Publisher<U>> composer);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I would rather only have one method for "extending" functionality? Is that also your goal? In which case we should discuss the merits of compose as a replacement for extend.

For comparison I'm including the signature of extend.

<O2, X extends Consumable<O2>> X extend(Function<Consumer<O>, X> f)

Which in the case of a Flowable would be as follows (note that I have replaced the Consumable<O2> with Publisher<U>)

<O2, X extends<Publisher<U>> X extend(Function<Consumer<Subscriber<? super T>>, X> f)

So the only substantial difference between compose and extend is that with compose the function consumes a concrete Flowable type and with extend it consumes an OnSubscribe.

I think my concern is that the compose functions must know the absolute type of the thing that you are converting from (or at least a base class/interface). This makes it more restrictive than what extend allows, converting from any variation of a Flowable by virtue of unwrapping and accessing the internal callback. So composition functions would be case specific, and would not be reusable for different types of flowable, observable, single, or completable.

Note that I am NOT proposing making conversion functions from a flowable that also work on an observable for instance. What I am proposing is that if you have different variations of an Observable that the same logic could be use to convert it to a different thing. I think that there is the potential for better code reuse with extend. Also my intent is that extending functions for the core RxJava types (i.e. the Flowable, Observable, Single, Completable and all of there variants which implement their respective Consumable<S> for their subscriber/observer type) be written and packaged with RxJava. But this would not be limited to just internal apis as the pattern could be useful for libraries which write their own Consumable.


In concert, `create(OnSubscribe)` will not be available; standard operators extend the base types directly. The conversion of other RS-based libraries will happen through the `Flowable.wrap(Publisher<T>)` static method.

(*The unfortunate effect of `create` in 1.x was the ignorance of the Observable contract and beginner's first choice as an entry point. We can't eliminate this path since `rs.Publisher` is a single method functional interface that can be implemented just as badly.*)

Therefore, new standard factory methods will try to address the common entry point requirements.

The `Flowable` will contain the following `create` methods:

- `create(SyncGenerator<T, S>)`: safe, synchronous generation of signals, one-by-one
- `create(AsyncOnSubscribe<T, S>)`: batch-create signals based on request patterns
- `create(Consumer<? super FlowEmitter<T>>)`: relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
- `createSingle(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks)
- `createEmpty(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources

The `Observable` will contain the following `create` methods:

- `create(SyncGenerator<T, S>)`: safe, synchronous generation of signals, one-by-one
- `create(Consumer<? super FlowEmitter<T>>)`: relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
- `createSingle(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks)
- `createEmpty(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources

The `Single` will contain the following `create` method:

- `create(Consumer<? super SingleEmitter<T>>)`: relay a single value or error from other reactive sources (i.e., addListener callbacks)

The `Completable` will contain the following `create` method:

Flowable.create(AsyncGenerator generator)
- `create(Consumer<? super CompletionEmitter>)`: signal a completion or error from valueless reactive sources

Observable<T>.create(OnSubscribe<Observer<T>> onSubscribe)

Single<T>.create(OnSubscribe<Single.Subscriber<T>> onSubscribe)
The first two `create` methods take an implementation of an interface which provides state and the generator methods:

Completable<T>.create(OnSubscribe<Completable.Subscriber<T>> onSubscribe)
```java
interface SyncGenerator<T, S> {

S createState();

S generate(S state, Observer<T> output);

void disposeState(S state);
}

interface AsyncGenerator<T, S> {

S createState();

S generate(S state, long requested, Observer<Observable<T>> output);

void disposeState(S state);
}
```

These latter three `create` methods will provide the following interaction interfaces to the `java.util.function.Consumer`:

```java
interface SingleEmitter<T> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You deleted the mention of how to create a Single. Is the intent that a Consumable<SingleEmitter<T>> will replace the Single<T>.create(OnSubscribe<Single.Subscriber<T>> onSubscribe)? What will be used to replace the creation of a Completable as mentioned: Completable<T>.create(OnSubscribe<Completable.Subscriber<T>> onSubscribe)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detailed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm not understanding this correctly. Could you please give us an example of how this would work in practice?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// usage
Consumer<SingleEmitter<T>> c = e -> {
    AutoCloseable close = someAPI.performAsync(new Callback<T>() {
        @Override
        public void onData(T data) {
            e.complete(data);
        }

        @Override
        public void onFailure(Throwable ex) {
            e.fail(ex);
        }
    });
    e.setDisposable(() -> {
        try {
            close.close();
        } catch (Exception ex) {
            RxJavaPlugins.onError(ex);
        }
    });
};

Flowable.create(c).subscribe(System.out::println);


complete(T value);

fail(Throwable error);

stop();

setDisposable(Disposable d);

}

interface FlowEmitter<T> {

void next(T value);

void fail(Throwable error);

void complete();

void stop();

setDisposable(Disposable d);

enum BackpressureHandling {
IGNORE,
ERROR,
DROP,
LATEST,
BUFFER
}

void setBackpressureHandling(BackpressureHandling mode);

}

interface CompletableEmitter<T> {

complete();

fail(Throwable error);

stop();

setDisposable(Disposable d);

}

```

By extending the base classes, operator implementations would loose the tracking/wrapping features of 1.x. To avoid this, the methods `subscribe(C)` will be final and operators have to implement a protected `subscribeActual` (or any other reasonable name).

```java
@Override
public final void subscribe(Subscriber<? super T> s) {
subscribeActual(hook.onSubscribe(s));
}

protected abstract void subscribeActual(Subscriber<? super T> s);
```

Assembly-time hooks will be moved into the individual standard methods on the base types:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has been no previous mention of the hooks in this document. Is it necessary to include this in the design?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you otherwise explain the need to make subscribe final and have a subscribeActual instead?


```java
public final Flowable<R> map(Function<? super T, ? extends R> mapper) {
return hook.onAssembly(new FlowableMap<T, R>(this, mapper));
}
```

### Terminal behavior
Expand All @@ -359,6 +499,100 @@ This section contains current design work which needs more discussion and elabor

We are investigate a base interface (similar to `Publisher`) for the `Observable`, `Single`, and `Completable` (currently referred to as `Consumable` or `ConsumableObservable`). This would empower library owners and api developers to implement their own type of `Observable`, `Single`, or `Completable` without extending the class. This would result in a change the type signatures of `subscribe` as well as any operator that operates over an `Observable`, `Single`, or `Completable` to accept a more generic type (i.e. `ConsumableObservable`). For more information see the proof of concept project [Consumable](https://github.com/stealthcode/Consumable).

#### Fusion (To be confirmed)
#### Fusion

Operator fusion exploits the declarative nature of building flows; the developer specifies the "what", "where" and "when", the library then tries to optimize the "how".

There are two main levels of operator fusion: *macro* and *micro*.

##### Macro-fusion

Macro fusion deals with the higher level view of the operators, their identity and their combination (mostly in the form of subsequence). This is partially an internal affair of the operators, triggered by the downstream operator and may work with several cases. Given an operator application pair `a().b()` where `a` could be a source or an intermediate operator itself, when the application of `b` happens in assembly time, the following can happen:

- `b` identifies `a` and decides to not apply itself. Example: `empty().flatMap()` is functionally a no-op
- `b` identifies `a` and decides to apply a different, conventional operator. Example: `just().subscribeOn()` is turned into `just().observeOn()`.
- `b` decides to apply a new custom operator, combining and inlining existing behavior. Example: `just().subscribeOn()` internally goes to `ScalarScheduledPublisher`.
- `a` is `b` and the two operator's parameter set can be combined into a single application. Example: `filter(p1).filter(p2)` combined into `filter(p1 && p2)`

Participating in the macro-fusion externally is possible by implementing a marker interface when extending `Flowable`. Two kinds of interfaces are available:

- `java.util.Callable`: the Java standard, throwing interface, indicating the single value has to be extracted in subscription time (or later).
- `ScalarCallable`: to indicate the single value can be safely extracted during assembly time and used/inlined in other operators:

```java
interface ScalarCallable<T> extends java.util.Callable<T> {
@Override
T call();
}
```

`ScalarCallable` is also `Callable` and thus its value can be extracted practically anytime. For convenience (and for sense), `ScalarCallable` overrides and hides the superclass' `throws Exception` clause - throwing during assembly time is likely unreasonable for scalars.

Since Reactive-Streams doesn't allow `null`s in the value flow, we have the opportunity to define `ScalarCallable`s and `Callable`s returning `null` should be considered as an empty source - allowing operators to dispatch on the type `Callable` first then branch on the nullness of `call()`.

Interoperating with other libraries, at this level is possible. Reactor-Core uses the same pattern and the two libraries can work with each other's `Publisher+Callable` types. Unfortunately, this means subscription-time only fusion as `ScalarCallable`s live locally in each library.

##### Micro-fusion

Micro-fusion goes a step deeper and tries to reuse internal structures, mostly queues, in operator pairs, saving on allocation and sometimes on atomic operations. It's property is that, in a way, subverts the standard Reactive-Streams protocol between subsequent operators that both support fusion. However, from the outside world's view, they still work according to the RS protocol.

Currently, two main kinds of micro-fusion opportunities are available.

###### 1) Conditional Subscriber

This extends the RS `Subscriber`interface with an extra method: `boolean tryOnNext(T value)` and can help avoiding small request amounts in case an operator didn't forward but dropped the value. The canonical use is for the `filter()` operator where if the predicate returns false, the operator has to request 1 from upstream (since the downstream doesn't know there was a value dropped and thus not request itself). Operators wanting to participate in this fusion have to implement and subscribe with an extended Subscriber interface:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify this with an example.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of what? Rsc has a bunch of implementations with this pattern, for example, filter.


```java
interface ConditionalSubscriber<T> {
boolean tryOnNext(T value);
}

//...
@Override
protected void subscribeActual(Subscriber<? super T> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new FilterConditionalSubscriber<>(s, predicate));
} else {
source.subscribe(new FilterRegularSubscriber<>(s, predicate));
}
}
```

(Note that this may lead to extra case-implementations in operators that have some kind of queue-drain emission model.)

###### 2) Queue-fusion

The second category is when two (or more) operators share the same underlying queue and each append activity at the exit point (i.e., poll()) of the queue. This can work in two modes: synchronous and asynchronous.

In synchronous mode, the elements of the sequence is already available (i.e., a fixed `range()` or `fromArray()`, or can be synchronously calculated in a pull fashion in `fromIterable`. In this mode, the requesting and regular onError-path is bypassed and is forbidden. Sources have to return null from `pull()` and false from `isEmpty()` if they have no more values and throw from these methods if they want to indicate an exceptional case.

In asynchronous mode, elements may become available at any time, therefore, `pull` returning null, as with regular queue-drain, is just the indication of temporary lack of source values. Completion and error still has to go through `onComplete` and `onError` as usual, requesting still happens as usual but when a value is available in the shared queue, it is indicated by an `onNext(null)` call. This can trigger a chain of `drain` calls without moving values in or out of different queues.

In both modes, `cancel` works and behaves as usual.

Since this fusion mode is an optional extension, the mode switch has to be negotiated and the shared queue interface established. Operators already working with internal queues then can, mostly, keep their current `drain()` algorithm. Queue-fusion has its own interface and protocol built on top of the existing `onSubscribe`-`Subscription` rail:

```java
interface QueueSubscription<T> implements Queue<T>, Subscription {
int NONE = 0;
int SYNC = 1;
int ASYNC = 2;
int ANY = SYNC | ASYNC;
int BOUNDARY = 4;

int requestFusion(int mode);
}
```

For performance, the mode is an integer bitflags setup, called early during subscription time, and allows negotiating the fusion mode. Usually, producers can do only one mode and consumers can do both mode. Because fused, intermediate operators attach logic (which is many times user-callback) to the exit point of the queue interface (poll()), it may change the computation location of those callbacks in an unwanted way. The flag `BOUNDARY` is added by consumers indicating that they will consume the queue over an async boundary. Intermediate operators, such as `map` and `filter` then can reject the fusion in such sequences.

Since RxJava 2.x is still JDK 6 compatible, the `QueueSubscription` can't itself default unnecessary methods and implementations are required to throw `UnsupportedOperationException` for `Queue` methods other than the following:

- `poll()`
- `isEmpty()`
- `clear()`
- `size()`

Even though other modern libraries also define this interface, they live in local packages and thus non-reusable without dragging in the whole library. Therefore, until externalized and standardized, cross-library micro-fusion won't happen.

We intend to enable operator fusion, but we don't have any specification yet. Nothing we do here should prevent the implementation of fusion.
A consequence of the extension of the `onSubscribe`-`Subscription` rail is that intermediate operators are no longer allowed to pass an upstream `Subscription` directly to its downstream `Subscriber.onSubscribe`. Doing so is likely to have the fused sequence skip the operator completely, losing behavior or causing runtime exceptions. Since RS `Subscriber` is an interface, operators can simply implement both `Subscriber` and `Subscription` on themselves, delegating the `request` and `cancel` calls to the upstream and calling `child.onSubscribe(this)`.