-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Commit 66ec579
committed
rx.Single
Adds `rx.Single` as an "Observable Future" for representing work with a single return value.
See #1594 rx.Future/Task/Async/Single
This provides a type similar to `Future` in that it represents a scalar unit of work, but it is lazy like an `Observable` and many `Single`s can be combined into an `Observable` stream. Note how `Single.zip` returns `Single<R>` whereas `Single.merge` returns `Observable<R>`.
Examples of using this class:
```java
import rx.Observable;
import rx.Single;
public class TaskExamples {
public static void main(String... args) {
// scalar synchronous value
Single<String> t1 = Single.create(t -> {
t.onSuccess("Hello World!");
});
// scalar synchronous value using helper method
Single<Integer> t2 = Single.just(1);
// synchronous error
Single<String> error = Single.create(t -> {
t.onError(new RuntimeException("failed!"));
});
// executing
t1.subscribe(System.out::println);
t2.subscribe(System.out::println);
error.subscribe(System.out::println, e -> System.out.println(e.getMessage()));
// scalar Singles for request/response like a Future
getData(1).subscribe(System.out::println);
// combining Tasks into another Task
Single<String> zipped = Single.zip(t1, t2, (a, b) -> a + " -- " + b);
// combining Singles into an Observable stream
Observable<String> merged = Single.merge(t1, t2.map(String::valueOf), getData(3));
Observable<String> mergeWith = t1.mergeWith(t2.map(String::valueOf));
zipped.subscribe(v -> System.out.println("zipped => " + v));
merged.subscribe(v -> System.out.println("merged => " + v));
mergeWith.subscribe(v -> System.out.println("mergeWith => " + v));
}
/**
* Example of an async scalar execution using Single.create
* <p>
* This shows the lazy, idiomatic approach for Rx exactly like an Observable except scalar.
*
* @param arg
* @return
*/
public static Single<String> getData(int arg) {
return Single.create(s -> {
new Thread(() -> {
try {
Thread.sleep(500);
} catch (Exception e) {
e.printStackTrace();
}
// deliver value
s.onSuccess("Data=" + arg);
}).start();
});
}
}
```1 parent f956293 commit 66ec579Copy full SHA for 66ec579
File tree
Expand file treeCollapse file tree
3 files changed
+2304
-0
lines changedFilter options
- src
- main/java/rx
- test/java/rx
Expand file treeCollapse file tree
3 files changed
+2304
-0
lines changed
0 commit comments