Skip to content

Commit 7194f3e

Browse files
authored
2.x: Improved XSubject JavaDocs (#5802)
* 2.x: Improved XSubject JavaDocs * Remove "still"
1 parent f00533c commit 7194f3e

File tree

11 files changed

+548
-45
lines changed

11 files changed

+548
-45
lines changed

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
* given {@code Subscription} being cancelled immediately.
8787
* <p>
8888
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
89-
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
89+
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
9090
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
9191
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
9292
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).

src/main/java/io/reactivex/subjects/AsyncSubject.java

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,86 @@
2424
import io.reactivex.plugins.RxJavaPlugins;
2525

2626
/**
27-
* An Subject that emits the very last value followed by a completion event or the received error to Observers.
28-
*
29-
* <p>The implementation of onXXX methods are technically thread-safe but non-serialized calls
27+
* A Subject that emits the very last value followed by a completion event or the received error to Observers.
28+
* <p>
29+
* This subject does not have a public constructor by design; a new empty instance of this
30+
* {@code AsyncSubject} can be created via the {@link #create()} method.
31+
* <p>
32+
* Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
33+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
34+
* as parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
35+
* {@link NullPointerException} being thrown and the subject's state is not changed.
36+
* <p>
37+
* Since an {@code AsyncSubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
38+
* <p>
39+
* When this {@code AsyncSubject} is terminated via {@link #onError(Throwable)}, the
40+
* last observed item (if any) is cleared and late {@link io.reactivex.Observer}s only receive
41+
* the {@code onError} event.
42+
* <p>
43+
* The {@code AsyncSubject} caches the latest item internally and it emits this item only when {@code onComplete} is called.
44+
* Therefore, it is not recommended to use this {@code Subject} with infinite or never-completing sources.
45+
* <p>
46+
* Even though {@code AsyncSubject} implements the {@code Observer} interface, calling
47+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
48+
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
49+
* after the {@code AsyncSubject} reached its terminal state will result in the
50+
* given {@code Disposable} being disposed immediately.
51+
* <p>
52+
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
53+
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
54+
* through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
55+
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
56+
* consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).
57+
* The implementation of onXXX methods are technically thread-safe but non-serialized calls
3058
* to them may lead to undefined state in the currently subscribed Observers.
59+
* <p>
60+
* This {@code AsyncSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
61+
* {@link #getThrowable()} and {@link #hasObservers()} as well as means to read the very last observed value -
62+
* after this {@code AsyncSubject} has been completed - in a non-blocking and thread-safe
63+
* manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}.
64+
* <dl>
65+
* <dt><b>Scheduler:</b></dt>
66+
* <dd>{@code AsyncSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
67+
* the {@code Observer}s get notified on the thread where the terminating {@code onError} or {@code onComplete}
68+
* methods were invoked.</dd>
69+
* <dt><b>Error handling:</b></dt>
70+
* <dd>When the {@link #onError(Throwable)} is called, the {@code AsyncSubject} enters into a terminal state
71+
* and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission,
72+
* if one or more {@code Observer}s dispose their respective {@code Disposable}s, the
73+
* {@code Throwable} is delivered to the global error handler via
74+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s
75+
* cancel at once).
76+
* If there were no {@code Observer}s subscribed to this {@code AsyncSubject} when the {@code onError()}
77+
* was called, the global error handler is not invoked.
78+
* </dd>
79+
* </dl>
80+
* <p>
81+
* Example usage:
82+
* <pre><code>
83+
* AsyncSubject&lt;Object&gt; subject = AsyncSubject.create();
84+
*
85+
* TestObserver&lt;Object&gt; to1 = subject.test();
86+
*
87+
* to1.assertEmpty();
88+
*
89+
* subject.onNext(1);
90+
*
91+
* // AsyncSubject only emits when onComplete was called.
92+
* to1.assertEmpty();
3193
*
94+
* subject.onNext(2);
95+
* subject.onComplete();
96+
*
97+
* // onComplete triggers the emission of the last cached item and the onComplete event.
98+
* to1.assertResult(2);
99+
*
100+
* TestObserver&lt;Object&gt; to2 = subject.test();
101+
*
102+
* // late Observers receive the last cached item too
103+
* to2.assertResult(2);
104+
* </code></pre>
32105
* @param <T> the value type
33106
*/
34-
35107
public final class AsyncSubject<T> extends Subject<T> {
36108

37109
@SuppressWarnings("rawtypes")

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,11 @@
3636
* a new non-empty instance can be created via {@link #createDefault(Object)} (named as such to avoid
3737
* overload resolution conflict with {@code Observable.create} that creates an Observable, not a {@code BehaviorSubject}).
3838
* <p>
39-
* Since the {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
39+
* Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
4040
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as
4141
* default initial values in {@link #createDefault(Object)} or as parameters to {@link #onNext(Object)} and
42-
* {@link #onError(Throwable)}.
42+
* {@link #onError(Throwable)}. Such calls will result in a
43+
* {@link NullPointerException} being thrown and the subject's state is not changed.
4344
* <p>
4445
* Since a {@code BehaviorSubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
4546
* <p>
@@ -83,11 +84,11 @@
8384
* Even though {@code BehaviorSubject} implements the {@code Observer} interface, calling
8485
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
8586
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
86-
* after the {@code BehaviorSubjecct} reached its terminal state will result in the
87+
* after the {@code BehaviorSubject} reached its terminal state will result in the
8788
* given {@code Disposable} being disposed immediately.
8889
* <p>
8990
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
90-
* is still required to be serialized (called from the same thread or called non-overlappingly from different threads
91+
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
9192
* through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
9293
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
9394
* consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).

src/main/java/io/reactivex/subjects/CompletableSubject.java

Lines changed: 54 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,61 @@
2424
/**
2525
* Represents a hot Completable-like source and consumer of events similar to Subjects.
2626
* <p>
27-
* All methods are thread safe. Calling onComplete multiple
28-
* times has no effect. Calling onError multiple times relays the Throwable to
29-
* the RxJavaPlugins' error handler.
27+
* This subject does not have a public constructor by design; a new non-terminated instance of this
28+
* {@code CompletableSubject} can be created via the {@link #create()} method.
3029
* <p>
31-
* The CompletableSubject doesn't store the Disposables coming through onSubscribe but
32-
* disposes them once the other onXXX methods were called (terminal state reached).
30+
* Since the {@code CompletableSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
31+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
32+
* as parameters to {@link #onError(Throwable)}.
33+
* <p>
34+
* Even though {@code CompletableSubject} implements the {@code CompletableObserver} interface, calling
35+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
36+
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
37+
* after the {@code CompletableSubject} reached its terminal state will result in the
38+
* given {@code Disposable} being disposed immediately.
39+
* <p>
40+
* All methods are thread safe. Calling {@link #onComplete()} multiple
41+
* times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to
42+
* the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler.
43+
* <p>
44+
* This {@code CompletableSubject} supports the standard state-peeking methods {@link #hasComplete()},
45+
* {@link #hasThrowable()}, {@link #getThrowable()} and {@link #hasObservers()}.
46+
* <dl>
47+
* <dt><b>Scheduler:</b></dt>
48+
* <dd>{@code CompletableSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
49+
* the {@code CompletableObserver}s get notified on the thread where the terminating {@code onError} or {@code onComplete}
50+
* methods were invoked.</dd>
51+
* <dt><b>Error handling:</b></dt>
52+
* <dd>When the {@link #onError(Throwable)} is called, the {@code CompletableSubject} enters into a terminal state
53+
* and emits the same {@code Throwable} instance to the last set of {@code CompletableObserver}s. During this emission,
54+
* if one or more {@code CompletableObserver}s dispose their respective {@code Disposable}s, the
55+
* {@code Throwable} is delivered to the global error handler via
56+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code CompletableObserver}s
57+
* cancel at once).
58+
* If there were no {@code CompletableObserver}s subscribed to this {@code CompletableSubject} when the {@code onError()}
59+
* was called, the global error handler is not invoked.
60+
* </dd>
61+
* </dl>
62+
* <p>
63+
* Example usage:
64+
* <pre><code>
65+
* CompletableSubject subject = CompletableSubject.create();
66+
*
67+
* TestObserver&lt;Void&gt; to1 = subject.test();
68+
*
69+
* // a fresh CompletableSubject is empty
70+
* to1.assertEmpty();
71+
*
72+
* subject.onComplete();
73+
*
74+
* // a CompletableSubject is always void of items
75+
* to1.assertResult();
76+
*
77+
* TestObserver&lt;Void&gt; to2 = subject.test()
78+
*
79+
* // late CompletableObservers receive the terminal event
80+
* to2.assertResult();
81+
* </code></pre>
3382
* <p>History: 2.0.5 - experimental
3483
* @since 2.1
3584
*/

src/main/java/io/reactivex/subjects/MaybeSubject.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,85 @@
2424
/**
2525
* Represents a hot Maybe-like source and consumer of events similar to Subjects.
2626
* <p>
27-
* All methods are thread safe. Calling onSuccess or onComplete multiple
28-
* times has no effect. Calling onError multiple times relays the Throwable to
29-
* the RxJavaPlugins' error handler.
27+
* This subject does not have a public constructor by design; a new non-terminated instance of this
28+
* {@code MaybeSubject} can be created via the {@link #create()} method.
3029
* <p>
31-
* The MaybeSubject doesn't store the Disposables coming through onSubscribe but
32-
* disposes them once the other onXXX methods were called (terminal state reached).
30+
* Since the {@code MaybeSubject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
31+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
32+
* as parameters to {@link #onSuccess(Object)} and {@link #onError(Throwable)}. Such calls will result in a
33+
* {@link NullPointerException} being thrown and the subject's state is not changed.
34+
* <p>
35+
* Since a {@code MaybeSubject} is a {@link io.reactivex.Maybe}, calling {@code onSuccess}, {@code onError}
36+
* or {@code onComplete} will move this {@code MaybeSubject} into its terminal state atomically.
37+
* <p>
38+
* All methods are thread safe. Calling {@link #onSuccess(Object)} or {@link #onComplete()} multiple
39+
* times has no effect. Calling {@link #onError(Throwable)} multiple times relays the {@code Throwable} to
40+
* the {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} global error handler.
41+
* <p>
42+
* Even though {@code MaybeSubject} implements the {@code MaybeObserver} interface, calling
43+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
44+
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
45+
* after the {@code MaybeSubject} reached its terminal state will result in the
46+
* given {@code Disposable} being disposed immediately.
47+
* <p>
48+
* This {@code MaybeSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
49+
* {@link #getThrowable()} and {@link #hasObservers()} as well as means to read any success item in a non-blocking
50+
* and thread-safe manner via {@link #hasValue()} and {@link #getValue()}.
51+
* <p>
52+
* The {@code MaybeSubject} does not support clearing its cached {@code onSuccess} value.
53+
* <dl>
54+
* <dt><b>Scheduler:</b></dt>
55+
* <dd>{@code MaybeSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
56+
* the {@code MaybeObserver}s get notified on the thread where the terminating {@code onSuccess}, {@code onError} or {@code onComplete}
57+
* methods were invoked.</dd>
58+
* <dt><b>Error handling:</b></dt>
59+
* <dd>When the {@link #onError(Throwable)} is called, the {@code MaybeSubject} enters into a terminal state
60+
* and emits the same {@code Throwable} instance to the last set of {@code MaybeObserver}s. During this emission,
61+
* if one or more {@code MaybeObserver}s dispose their respective {@code Disposable}s, the
62+
* {@code Throwable} is delivered to the global error handler via
63+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code MaybeObserver}s
64+
* cancel at once).
65+
* If there were no {@code MaybeObserver}s subscribed to this {@code MaybeSubject} when the {@code onError()}
66+
* was called, the global error handler is not invoked.
67+
* </dd>
68+
* </dl>
69+
* <p>
70+
* Example usage:
71+
* <pre><code>
72+
* MaybeSubject&lt;Integer&gt; subject1 = MaybeSubject.create();
73+
*
74+
* TestObserver&lt;Integer&gt; to1 = subject1.test();
75+
*
76+
* // MaybeSubjects are empty by default
77+
* to1.assertEmpty();
78+
*
79+
* subject1.onSuccess(1);
80+
*
81+
* // onSuccess is a terminal event with MaybeSubjects
82+
* // TestObserver converts onSuccess into onNext + onComplete
83+
* to1.assertResult(1);
84+
*
85+
* TestObserver&lt;Integer&gt; to2 = subject1.test();
86+
*
87+
* // late Observers receive the terminal signal (onSuccess) too
88+
* to2.assertResult(1);
89+
*
90+
* // -----------------------------------------------------
91+
*
92+
* MaybeSubject&lt;Integer&gt; subject2 = MaybeSubject.create();
93+
*
94+
* TestObserver&lt;Integer&gt; to3 = subject2.test();
95+
*
96+
* subject2.onComplete();
97+
*
98+
* // a completed MaybeSubject completes its MaybeObservers
99+
* to3.assertResult();
100+
*
101+
* TestObserver&lt;Integer&gt; to4 = subject1.test();
102+
*
103+
* // late Observers receive the terminal signal (onComplete) too
104+
* to4.assertResult();
105+
* </code></pre>
33106
* <p>History: 2.0.5 - experimental
34107
* @param <T> the value type received and emitted
35108
* @since 2.1

src/main/java/io/reactivex/subjects/PublishSubject.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,57 @@
2222
import io.reactivex.plugins.RxJavaPlugins;
2323

2424
/**
25-
* Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the
26-
* subscriber.
25+
* A Subject that emits (multicasts) items to currently subscribed {@link Observer}s and terminal events to current
26+
* or late {@code Observer}s.
2727
* <p>
2828
* <img width="640" height="405" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/S.PublishSubject.png" alt="">
2929
* <p>
30+
* This subject does not have a public constructor by design; a new empty instance of this
31+
* {@code PublishSubject} can be created via the {@link #create()} method.
32+
* <p>
33+
* Since a {@code Subject} is conceptionally derived from the {@code Processor} type in the Reactive Streams specification,
34+
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as
35+
* parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
36+
* {@link NullPointerException} being thrown and the subject's state is not changed.
37+
* <p>
38+
* Since a {@code PublishSubject} is an {@link io.reactivex.Observable}, it does not support backpressure.
39+
* <p>
40+
* When this {@code PublishSubject} is terminated via {@link #onError(Throwable)} or {@link #onComplete()},
41+
* late {@link io.reactivex.Observer}s only receive the respective terminal event.
42+
* <p>
43+
* Unlike a {@link BehaviorSubject}, a {@code PublishSubject} doesn't retain/cache items, therefore, a new
44+
* {@code Observer} won't receive any past items.
45+
* <p>
46+
* Even though {@code PublishSubject} implements the {@code Observer} interface, calling
47+
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
48+
* if the subject is used as a standalone source. However, calling {@code onSubscribe}
49+
* after the {@code PublishSubject} reached its terminal state will result in the
50+
* given {@code Disposable} being disposed immediately.
51+
* <p>
52+
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
53+
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
54+
* through external means of serialization). The {@link #toSerialized()} method available to all {@code Subject}s
55+
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Observer}
56+
* consuming this subject also wants to call {@link #onNext(Object)} on this subject recursively).
57+
* <p>
58+
* This {@code PublishSubject} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
59+
* {@link #getThrowable()} and {@link #hasObservers()}.
60+
* <dl>
61+
* <dt><b>Scheduler:</b></dt>
62+
* <dd>{@code PublishSubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and
63+
* the {@code Observer}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
64+
* <dt><b>Error handling:</b></dt>
65+
* <dd>When the {@link #onError(Throwable)} is called, the {@code PublishSubject} enters into a terminal state
66+
* and emits the same {@code Throwable} instance to the last set of {@code Observer}s. During this emission,
67+
* if one or more {@code Observer}s dispose their respective {@code Disposable}s, the
68+
* {@code Throwable} is delivered to the global error handler via
69+
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Observer}s
70+
* cancel at once).
71+
* If there were no {@code Observer}s subscribed to this {@code PublishSubject} when the {@code onError()}
72+
* was called, the global error handler is not invoked.
73+
* </dd>
74+
* </dl>
75+
* <p>
3076
* Example usage:
3177
* <pre> {@code
3278
@@ -40,6 +86,8 @@
4086
subject.onNext("three");
4187
subject.onComplete();
4288
89+
// late Observers only receive the terminal event
90+
subject.test().assertEmpty();
4391
} </pre>
4492
*
4593
* @param <T>

0 commit comments

Comments
 (0)