Skip to content

Commit 0ee2e61

Browse files
authored
1.x: cleanup, javadoc, Completable.fromEmitter (#4442)
1 parent db3ff46 commit 0ee2e61

20 files changed

+815
-88
lines changed

src/main/java/rx/AsyncEmitter.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
* other methods are threadsafe.
2929
*
3030
* @param <T> the value type to emit
31+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
3132
*/
3233
@Experimental
3334
public interface AsyncEmitter<T> extends Observer<T> {
@@ -69,14 +70,28 @@ interface Cancellable {
6970
* Options to handle backpressure in the emitter.
7071
*/
7172
enum BackpressureMode {
73+
/**
74+
* No backpressure is applied an the onNext calls pass through the AsyncEmitter;
75+
* note that this may cause {@link rx.exceptions.MissingBackpressureException} or {@link IllegalStateException}
76+
* somewhere downstream.
77+
*/
7278
NONE,
73-
79+
/**
80+
* Signals a {@link rx.exceptions.MissingBackpressureException} if the downstream can't keep up.
81+
*/
7482
ERROR,
75-
83+
/**
84+
* Buffers (unbounded) all onNext calls until the dowsntream can consume them.
85+
*/
7686
BUFFER,
77-
87+
/**
88+
* Drops the incoming onNext value if the downstream can't keep up.
89+
*/
7890
DROP,
79-
91+
/**
92+
* Keeps the latest onNext value and overwrites it with newer ones until the downstream
93+
* can consume it.
94+
*/
8095
LATEST
8196
}
8297
}

src/main/java/rx/Completable.java

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,9 @@ public static Completable concat(Observable<? extends Completable> sources, int
457457
}
458458

459459
/**
460+
* Constructs a Completable instance by wrapping the given old onSubscribe callback.
461+
* @param onSubscribe the old CompletableOnSubscribe instance to wrap
462+
* @return a Completable instance
460463
* @deprecated Use {@link #create(OnSubscribe)}.
461464
*/
462465
@Deprecated // TODO Remove interface after 1.2.0 release. It was never a stable API. Provided only for migration.
@@ -620,6 +623,44 @@ public void call(rx.CompletableSubscriber s) {
620623
});
621624
}
622625

626+
/**
627+
* Provides an API (in a cold Completable) that bridges the Completable-reactive world
628+
* with the callback-based world.
629+
* <p>The {@link CompletableEmitter} allows registering a callback for
630+
* cancellation/unsubscription of a resource.
631+
* <p>
632+
* Example:
633+
* <pre><code>
634+
* Completable.fromEmitter(emitter -&gt; {
635+
* Callback listener = new Callback() {
636+
* &#64;Override
637+
* public void onEvent(Event e) {
638+
* emitter.onCompleted();
639+
* }
640+
*
641+
* &#64;Override
642+
* public void onFailure(Exception e) {
643+
* emitter.onError(e);
644+
* }
645+
* };
646+
*
647+
* AutoCloseable c = api.someMethod(listener);
648+
*
649+
* emitter.setCancellation(c::close);
650+
*
651+
* });
652+
* </code></pre>
653+
* <p>All of the CompletableEmitter's methods are thread-safe and ensure the
654+
* Completable's protocol are held.
655+
* @param producer the callback invoked for each incoming CompletableSubscriber
656+
* @return the new Completable instance
657+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
658+
*/
659+
@Experimental
660+
public static Completable fromEmitter(Action1<CompletableEmitter> producer) {
661+
return create(new CompletableFromEmitter(producer));
662+
}
663+
623664
/**
624665
* Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
625666
* <p>
@@ -1091,11 +1132,16 @@ protected Completable(CompletableOnSubscribe onSubscribe) {
10911132
* not null (not verified)
10921133
* @param useHook if false, RxJavaHooks.onCreate won't be called
10931134
*/
1094-
private Completable(OnSubscribe onSubscribe, boolean useHook) {
1135+
protected Completable(OnSubscribe onSubscribe, boolean useHook) {
10951136
this.onSubscribe = useHook ? RxJavaHooks.onCreate(onSubscribe) : onSubscribe;
10961137
}
10971138

10981139
/**
1140+
* Constructs a Completable instance with the given onSubscribe callback without calling the onCreate
1141+
* hook.
1142+
* @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe,
1143+
* not null (not verified)
1144+
* @param useHook if false, RxJavaHooks.onCreate won't be called
10991145
* @deprecated Use {@link #Completable(OnSubscribe, boolean)}.
11001146
*/
11011147
@Deprecated // TODO Remove constructor after 1.2.0 release. It was never a stable API. Provided only for migration.
@@ -1661,6 +1707,10 @@ public void onSubscribe(Subscription d) {
16611707
}
16621708

16631709
/**
1710+
* Lifts a CompletableSubscriber transformation into the chain of Completables.
1711+
* @param onLift the lifting function that transforms the child subscriber with a parent subscriber.
1712+
* @return the new Completable instance
1713+
* @throws NullPointerException if onLift is null
16641714
* @deprecated Use {@link #lift(Operator)}.
16651715
*/
16661716
@Deprecated // TODO Remove interface after 1.2.0 release. It was never a stable API. Provided only for migration.
@@ -2132,6 +2182,9 @@ private static void deliverUncaughtException(Throwable e) {
21322182
}
21332183

21342184
/**
2185+
* Subscribes the given CompletableSubscriber to this Completable instance.
2186+
* @param s the CompletableSubscriber, not null
2187+
* @throws NullPointerException if s is null
21352188
* @deprecated Use {@link #unsafeSubscribe(rx.CompletableSubscriber)}.
21362189
*/
21372190
@Deprecated // TODO Remove interface after 1.2.0 release. It was never a stable API. Provided only for migration.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import rx.AsyncEmitter.Cancellable;
19+
import rx.annotations.Experimental;
20+
21+
/**
22+
* Abstraction over a {@link CompletableSubscriber} that gets either an onCompleted or onError
23+
* signal and allows registering an cancellation/unsubscription callback.
24+
* <p>
25+
* All methods are thread-safe; calling onCompleted or onError twice or one after the other has
26+
* no effect.
27+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
28+
*/
29+
@Experimental
30+
public interface CompletableEmitter {
31+
32+
/**
33+
* Notifies the CompletableSubscriber that the {@link Completable} has finished
34+
* sending push-based notifications.
35+
* <p>
36+
* The {@link Observable} will not call this method if it calls {@link #onError}.
37+
*/
38+
void onCompleted();
39+
40+
/**
41+
* Notifies the CompletableSubscriber that the {@link Completable} has experienced an error condition.
42+
* <p>
43+
* If the {@link Completable} calls this method, it will not thereafter call
44+
* {@link #onCompleted}.
45+
*
46+
* @param t
47+
* the exception encountered by the Observable
48+
*/
49+
void onError(Throwable t);
50+
51+
/**
52+
* Sets a Subscription on this emitter; any previous Subscription
53+
* or Cancellation will be unsubscribed/cancelled.
54+
* @param s the subscription, null is allowed
55+
*/
56+
void setSubscription(Subscription s);
57+
58+
/**
59+
* Sets a Cancellable on this emitter; any previous Subscription
60+
* or Cancellation will be unsubscribed/cancelled.
61+
* @param c the cancellable resource, null is allowed
62+
*/
63+
void setCancellation(Cancellable c);
64+
}

src/main/java/rx/Observable.java

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2007,7 +2007,7 @@ public static <T> Observable<T> from(T[] array) {
20072007
*
20082008
* AutoCloseable c = api.someMethod(listener);
20092009
*
2010-
* emitter.setCancellable(c::close);
2010+
* emitter.setCancellation(c::close);
20112011
*
20122012
* }, BackpressureMode.BUFFER);
20132013
* </code></pre>
@@ -2022,10 +2022,58 @@ public static <T> Observable<T> from(T[] array) {
20222022
* @see AsyncEmitter
20232023
* @see AsyncEmitter.BackpressureMode
20242024
* @see AsyncEmitter.Cancellable
2025+
* @deprecated renamed, use {@link #fromEmitter(Action1, rx.AsyncEmitter.BackpressureMode)} instead
20252026
*/
20262027
@Experimental
2028+
@Deprecated // will be removed in 1.2.0
20272029
public static <T> Observable<T> fromAsync(Action1<AsyncEmitter<T>> asyncEmitter, AsyncEmitter.BackpressureMode backpressure) {
2028-
return create(new OnSubscribeFromAsync<T>(asyncEmitter, backpressure));
2030+
return fromEmitter(asyncEmitter, backpressure);
2031+
}
2032+
2033+
/**
2034+
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style,
2035+
* generally non-backpressured world.
2036+
* <p>
2037+
* Example:
2038+
* <pre><code>
2039+
* Observable.&lt;Event&gt;fromEmitter(emitter -&gt; {
2040+
* Callback listener = new Callback() {
2041+
* &#64;Override
2042+
* public void onEvent(Event e) {
2043+
* emitter.onNext(e);
2044+
* if (e.isLast()) {
2045+
* emitter.onCompleted();
2046+
* }
2047+
* }
2048+
*
2049+
* &#64;Override
2050+
* public void onFailure(Exception e) {
2051+
* emitter.onError(e);
2052+
* }
2053+
* };
2054+
*
2055+
* AutoCloseable c = api.someMethod(listener);
2056+
*
2057+
* emitter.setCancellation(c::close);
2058+
*
2059+
* }, BackpressureMode.BUFFER);
2060+
* </code></pre>
2061+
* <p>
2062+
* You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The
2063+
* rest of its methods are threadsafe.
2064+
*
2065+
* @param <T> the element type
2066+
* @param emitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable}
2067+
* @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
2068+
* @return the new Observable instance
2069+
* @see AsyncEmitter
2070+
* @see AsyncEmitter.BackpressureMode
2071+
* @see AsyncEmitter.Cancellable
2072+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
2073+
*/
2074+
@Experimental
2075+
public static <T> Observable<T> fromEmitter(Action1<AsyncEmitter<T>> emitter, AsyncEmitter.BackpressureMode backpressure) {
2076+
return create(new OnSubscribeFromEmitter<T>(emitter, backpressure));
20292077
}
20302078

20312079
/**
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
20+
import rx.*;
21+
import rx.AsyncEmitter.Cancellable;
22+
import rx.exceptions.Exceptions;
23+
import rx.functions.Action1;
24+
import rx.internal.operators.OnSubscribeFromEmitter.CancellableSubscription;
25+
import rx.internal.subscriptions.SequentialSubscription;
26+
import rx.plugins.RxJavaHooks;
27+
28+
/**
29+
* Allows push-based emission of terminal events to a CompletableSubscriber.
30+
*/
31+
public final class CompletableFromEmitter implements Completable.OnSubscribe {
32+
33+
final Action1<CompletableEmitter> producer;
34+
35+
public CompletableFromEmitter(Action1<CompletableEmitter> producer) {
36+
this.producer = producer;
37+
}
38+
39+
@Override
40+
public void call(CompletableSubscriber t) {
41+
FromEmitter emitter = new FromEmitter(t);
42+
t.onSubscribe(emitter);
43+
44+
try {
45+
producer.call(emitter);
46+
} catch (Throwable ex) {
47+
Exceptions.throwIfFatal(ex);
48+
emitter.onError(ex);
49+
}
50+
51+
}
52+
53+
static final class FromEmitter
54+
extends AtomicBoolean
55+
implements CompletableEmitter, Subscription {
56+
57+
/** */
58+
private static final long serialVersionUID = 5539301318568668881L;
59+
60+
final CompletableSubscriber actual;
61+
62+
final SequentialSubscription resource;
63+
64+
public FromEmitter(CompletableSubscriber actual) {
65+
this.actual = actual;
66+
resource = new SequentialSubscription();
67+
}
68+
69+
@Override
70+
public void onCompleted() {
71+
if (compareAndSet(false, true)) {
72+
try {
73+
actual.onCompleted();
74+
} finally {
75+
resource.unsubscribe();
76+
}
77+
}
78+
}
79+
80+
@Override
81+
public void onError(Throwable t) {
82+
if (compareAndSet(false, true)) {
83+
try {
84+
actual.onError(t);
85+
} finally {
86+
resource.unsubscribe();
87+
}
88+
} else {
89+
RxJavaHooks.onError(t);
90+
}
91+
}
92+
93+
@Override
94+
public void setSubscription(Subscription s) {
95+
resource.update(s);
96+
}
97+
98+
@Override
99+
public void setCancellation(Cancellable c) {
100+
setSubscription(new CancellableSubscription(c));
101+
}
102+
103+
@Override
104+
public void unsubscribe() {
105+
if (compareAndSet(false, true)) {
106+
resource.unsubscribe();
107+
}
108+
}
109+
110+
@Override
111+
public boolean isUnsubscribed() {
112+
return get();
113+
}
114+
115+
}
116+
}

0 commit comments

Comments
 (0)