Skip to content

Commit dd39461

Browse files
committed
Explain, rename, etc.
1 parent a4c1825 commit dd39461

File tree

1 file changed

+58
-8
lines changed

1 file changed

+58
-8
lines changed

DESIGN.md

Lines changed: 58 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -325,24 +325,61 @@ In the addition of the previous rules, an operator for `Flowable`:
325325

326326
### Creation
327327

328-
Unlike RxJava 1.x, 2.x base classes are abstract and stateless and generally no longer wrap an `OnSubscribe` callback - this saves allocation in assembly time without limiting the expressiveness. Operator methods and standard factories still live as final on the base classes.
328+
Unlike RxJava 1.x, 2.x base classes are to be abstract, stateless and generally no longer wrap an `OnSubscribe` callback - this saves allocation in assembly time without limiting the expressiveness. Operator methods and standard factories still live as final on the base classes.
329+
330+
Instead of the indirection of an `OnSubscribe` and `lift`, operators are to be implemented by extending the base classes. For example, the `map`
331+
operator will look like this:
332+
333+
```java
334+
public final class FlowableMap<T, R> extends Flowable<R> {
335+
336+
final Flowable<? extends T> source;
337+
338+
final Function<? super T, ? extends R> mapper;
339+
340+
public FlowableMap(Flowable<? extends T> source, Function<? super T, ? extends R> mapper) {
341+
this.source = source;
342+
this.mapper = mapper;
343+
}
344+
345+
@Override
346+
protected void subscribeActual(Subscriber<? super R> subscriber) {
347+
source.subscribe(new FlowableMapSubscriber<T, R>(subscriber, mapper));
348+
}
349+
350+
static final class FlowableMapSubscriber<T, R> implements Subscriber<T>, Subscription {
351+
// ...
352+
}
353+
}
354+
```
329355

330356
Since Java still doesn't have extension methods, "adding" more operators can only happen through helper methods such as `lift(C -> C)` and `compose(R -> P)` where `C` is the default consumer type (i.e., `rs.Subscriber`), `R` is the base type (i.e., `Flowable`) and `P` is the base interface (i.e., `rs.Publisher`). As before, the library itself may gain or lose standard operators and/or overloads through the same community process.
331357

332-
In concert, `create(OnSubscribe)` will not be available; standard operators extend the base types directly. The conversion of other RS-based libraries will happen through the `Flowable.wrap()` static method.
358+
In concert, `create(OnSubscribe)` will not be available; standard operators extend the base types directly. The conversion of other RS-based libraries will happen through the `Flowable.wrap(Publisher<T>)` static method.
333359

334360
(*The unfortunate effect of `create` in 1.x was the ignorance of the Observable contract and beginner's first choice as an entry point. We can't eliminate this path since `rs.Publisher` is a single method functional interface that can be implemented just as badly.*)
335361

336362
Therefore, new standard factory methods will try to address the common entry point requirements:
337-
- `create(SyncOnSubscribe)` to safe, synchronous generation of signals, one-by-one
338-
- `create(AsyncOnSubscribe)` to batch-create signals based on request patterns
363+
- `create(SyncGenerator<T, S>)` to safe, synchronous generation of signals, one-by-one
364+
- `create(AsyncGenerator<T, S>)` to batch-create signals based on request patterns
339365
- `create(Consumer<? super SingleEmitter<T>>)` to relay a single value or error from other reactive sources (i.e., addListener callbacks)
340-
- `create(Consumer<? super FlowEmitter<T>>)` to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
366+
- `create(Consumer<? super FlowEmitter<T>>)` to relay multiple values or error from multi-valued reactive-sources (i.e., button-clicks) while also give flow control options right there (buffer, drop, error, etc.).
367+
- `create(Consumer<? super CompletionEmitter>)` signal a completion or error from valueless reactive sources
368+
369+
The following table lists where these create methods will be available in respect of the base types:
370+
371+
| Method | Flowable | Observable | Single | Completable |
372+
|--------|----------|------------|--------|-------------|
373+
| `create(SyncGenerator<T, S>)` | Yes | Yes | No | No |
374+
| `create(AsyncOnSubscribe<T, S>)` | Yes | No | No | No |
375+
| `create(Consumer<? super SingleEmitter<T>>)` | Yes | Yes | Yes | No |
376+
| `create(Consumer<? super FlowEmitter<T>>)` | Yes | Yes | Yes | No |
377+
| `create(Consumer<? super CompletionEmitter)>` | Yes | Yes | No | Yes |
341378

342379
The first two `create` methods take an implementation of an interface which provides state and the generator methods:
343380

344381
```java
345-
interface SyncOnSubscribe<T, S> {
382+
interface SyncGenerator<T, S> {
346383

347384
S createState();
348385

@@ -351,7 +388,7 @@ interface SyncOnSubscribe<T, S> {
351388
void disposeState(S state);
352389
}
353390

354-
interface AsyncOnSubscribe<T, S> {
391+
interface AsyncGenerator<T, S> {
355392

356393
S createState();
357394

@@ -361,7 +398,7 @@ interface AsyncOnSubscribe<T, S> {
361398
}
362399
```
363400

364-
These latter two `create` methods will provide the following interaction interfaces to the `java.util.function.Consumer`:
401+
These latter three `create` methods will provide the following interaction interfaces to the `java.util.function.Consumer`:
365402

366403
```java
367404
interface SingleEmitter<T> {
@@ -399,6 +436,19 @@ interface FlowEmitter<T> {
399436
void setBackpressureHandling(BackpressureHandling mode);
400437

401438
}
439+
440+
interface CompletableEmitter<T> {
441+
442+
complete();
443+
444+
fail(Throwable error);
445+
446+
stop();
447+
448+
setDisposable(Disposable d);
449+
450+
}
451+
402452
```
403453

404454
By extending the base classes, operator implementations would loose the tracking/wrapping features of 1.x. To avoid this, the methods `subscribe(C)` will be final and operators have to implement a protected `subscribeActual` (or any other reasonable name).

0 commit comments

Comments
 (0)