Skip to content

1.x: cleanup, javadoc, Completable.fromEmitter #4442

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 30, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions src/main/java/rx/AsyncEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* other methods are threadsafe.
*
* @param <T> the value type to emit
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public interface AsyncEmitter<T> extends Observer<T> {
Expand Down Expand Up @@ -69,14 +70,28 @@ interface Cancellable {
* Options to handle backpressure in the emitter.
*/
enum BackpressureMode {
/**
* No backpressure is applied an the onNext calls pass through the AsyncEmitter;
* note that this may cause {@link rx.exceptions.MissingBackpressureException} or {@link IllegalStateException}
* somewhere downstream.
*/
NONE,

/**
* Signals a {@link rx.exceptions.MissingBackpressureException} if the downstream can't keep up.
*/
ERROR,

/**
* Buffers (unbounded) all onNext calls until the dowsntream can consume them.
*/
BUFFER,

/**
* Drops the incoming onNext value if the downstream can't keep up.
*/
DROP,

/**
* Keeps the latest onNext value and overwrites it with newer ones until the downstream
* can consume it.
*/
LATEST
}
}
55 changes: 54 additions & 1 deletion src/main/java/rx/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,9 @@ public static Completable concat(Observable<? extends Completable> sources, int
}

/**
* Constructs a Completable instance by wrapping the given old onSubscribe callback.
* @param onSubscribe the old CompletableOnSubscribe instance to wrap
* @return a Completable instance
* @deprecated Use {@link #create(OnSubscribe)}.
*/
@Deprecated // TODO Remove interface after 1.2.0 release. It was never a stable API. Provided only for migration.
Expand Down Expand Up @@ -620,6 +623,44 @@ public void call(rx.CompletableSubscriber s) {
});
}

/**
* Provides an API (in a cold Completable) that bridges the Completable-reactive world
* with the callback-based world.
* <p>The {@link CompletableEmitter} allows registering a callback for
* cancellation/unsubscription of a resource.
* <p>
* Example:
* <pre><code>
* Completable.fromEmitter(emitter -&gt; {
* Callback listener = new Callback() {
* &#64;Override
* public void onEvent(Event e) {
* emitter.onCompleted();
* }
*
* &#64;Override
* public void onFailure(Exception e) {
* emitter.onError(e);
* }
* };
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellation(c::close);
*
* });
* </code></pre>
* <p>All of the CompletableEmitter's methods are thread-safe and ensure the
* Completable's protocol are held.
* @param producer the callback invoked for each incoming CompletableSubscriber
* @return the new Completable instance
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static Completable fromEmitter(Action1<CompletableEmitter> producer) {
return create(new CompletableFromEmitter(producer));
}

/**
* Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
* <p>
Expand Down Expand Up @@ -1091,11 +1132,16 @@ protected Completable(CompletableOnSubscribe onSubscribe) {
* not null (not verified)
* @param useHook if false, RxJavaHooks.onCreate won't be called
*/
private Completable(OnSubscribe onSubscribe, boolean useHook) {
protected Completable(OnSubscribe onSubscribe, boolean useHook) {
this.onSubscribe = useHook ? RxJavaHooks.onCreate(onSubscribe) : onSubscribe;
}

/**
* Constructs a Completable instance with the given onSubscribe callback without calling the onCreate
* hook.
* @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe,
* not null (not verified)
* @param useHook if false, RxJavaHooks.onCreate won't be called
* @deprecated Use {@link #Completable(OnSubscribe, boolean)}.
*/
@Deprecated // TODO Remove constructor after 1.2.0 release. It was never a stable API. Provided only for migration.
Expand Down Expand Up @@ -1661,6 +1707,10 @@ public void onSubscribe(Subscription d) {
}

/**
* Lifts a CompletableSubscriber transformation into the chain of Completables.
* @param onLift the lifting function that transforms the child subscriber with a parent subscriber.
* @return the new Completable instance
* @throws NullPointerException if onLift is null
* @deprecated Use {@link #lift(Operator)}.
*/
@Deprecated // TODO Remove interface after 1.2.0 release. It was never a stable API. Provided only for migration.
Expand Down Expand Up @@ -2132,6 +2182,9 @@ private static void deliverUncaughtException(Throwable e) {
}

/**
* Subscribes the given CompletableSubscriber to this Completable instance.
* @param s the CompletableSubscriber, not null
* @throws NullPointerException if s is null
* @deprecated Use {@link #unsafeSubscribe(rx.CompletableSubscriber)}.
*/
@Deprecated // TODO Remove interface after 1.2.0 release. It was never a stable API. Provided only for migration.
Expand Down
64 changes: 64 additions & 0 deletions src/main/java/rx/CompletableEmitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import rx.AsyncEmitter.Cancellable;
import rx.annotations.Experimental;

/**
* Abstraction over a {@link CompletableSubscriber} that gets either an onCompleted or onError
* signal and allows registering an cancellation/unsubscription callback.
* <p>
* All methods are thread-safe; calling onCompleted or onError twice or one after the other has
* no effect.
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public interface CompletableEmitter {

/**
* Notifies the CompletableSubscriber that the {@link Completable} has finished
* sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onCompleted();

/**
* Notifies the CompletableSubscriber that the {@link Completable} has experienced an error condition.
* <p>
* If the {@link Completable} calls this method, it will not thereafter call
* {@link #onCompleted}.
*
* @param t
* the exception encountered by the Observable
*/
void onError(Throwable t);

/**
* Sets a Subscription on this emitter; any previous Subscription
* or Cancellation will be unsubscribed/cancelled.
* @param s the subscription, null is allowed
*/
void setSubscription(Subscription s);

/**
* Sets a Cancellable on this emitter; any previous Subscription
* or Cancellation will be unsubscribed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellation(Cancellable c);
}
52 changes: 50 additions & 2 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2007,7 +2007,7 @@ public static <T> Observable<T> from(T[] array) {
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellable(c::close);
* emitter.setCancellation(c::close);
*
* }, BackpressureMode.BUFFER);
* </code></pre>
Expand All @@ -2022,10 +2022,58 @@ public static <T> Observable<T> from(T[] array) {
* @see AsyncEmitter
* @see AsyncEmitter.BackpressureMode
* @see AsyncEmitter.Cancellable
* @deprecated renamed, use {@link #fromEmitter(Action1, rx.AsyncEmitter.BackpressureMode)} instead
*/
@Experimental
@Deprecated // will be removed in 1.2.0
public static <T> Observable<T> fromAsync(Action1<AsyncEmitter<T>> asyncEmitter, AsyncEmitter.BackpressureMode backpressure) {
return create(new OnSubscribeFromAsync<T>(asyncEmitter, backpressure));
return fromEmitter(asyncEmitter, backpressure);
}

/**
* Provides an API (via a cold Observable) that bridges the reactive world with the callback-style,
* generally non-backpressured world.
* <p>
* Example:
* <pre><code>
* Observable.&lt;Event&gt;fromEmitter(emitter -&gt; {
* Callback listener = new Callback() {
* &#64;Override
* public void onEvent(Event e) {
* emitter.onNext(e);
* if (e.isLast()) {
* emitter.onCompleted();
* }
* }
*
* &#64;Override
* public void onFailure(Exception e) {
* emitter.onError(e);
* }
* };
*
* AutoCloseable c = api.someMethod(listener);
*
* emitter.setCancellation(c::close);
*
* }, BackpressureMode.BUFFER);
* </code></pre>
* <p>
* You should call the AsyncEmitter's onNext, onError and onCompleted methods in a serialized fashion. The
* rest of its methods are threadsafe.
*
* @param <T> the element type
* @param emitter the emitter that is called when a Subscriber subscribes to the returned {@code Observable}
* @param backpressure the backpressure mode to apply if the downstream Subscriber doesn't request (fast) enough
* @return the new Observable instance
* @see AsyncEmitter
* @see AsyncEmitter.BackpressureMode
* @see AsyncEmitter.Cancellable
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
*/
@Experimental
public static <T> Observable<T> fromEmitter(Action1<AsyncEmitter<T>> emitter, AsyncEmitter.BackpressureMode backpressure) {
return create(new OnSubscribeFromEmitter<T>(emitter, backpressure));
}

/**
Expand Down
116 changes: 116 additions & 0 deletions src/main/java/rx/internal/operators/CompletableFromEmitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;

import java.util.concurrent.atomic.AtomicBoolean;

import rx.*;
import rx.AsyncEmitter.Cancellable;
import rx.exceptions.Exceptions;
import rx.functions.Action1;
import rx.internal.operators.OnSubscribeFromEmitter.CancellableSubscription;
import rx.internal.subscriptions.SequentialSubscription;
import rx.plugins.RxJavaHooks;

/**
* Allows push-based emission of terminal events to a CompletableSubscriber.
*/
public final class CompletableFromEmitter implements Completable.OnSubscribe {

final Action1<CompletableEmitter> producer;

public CompletableFromEmitter(Action1<CompletableEmitter> producer) {
this.producer = producer;
}

@Override
public void call(CompletableSubscriber t) {
FromEmitter emitter = new FromEmitter(t);
t.onSubscribe(emitter);

try {
producer.call(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}

}

static final class FromEmitter
extends AtomicBoolean
implements CompletableEmitter, Subscription {

/** */
private static final long serialVersionUID = 5539301318568668881L;

final CompletableSubscriber actual;

final SequentialSubscription resource;

public FromEmitter(CompletableSubscriber actual) {
this.actual = actual;
resource = new SequentialSubscription();
}

@Override
public void onCompleted() {
if (compareAndSet(false, true)) {
try {
actual.onCompleted();
} finally {
resource.unsubscribe();
}
}
}

@Override
public void onError(Throwable t) {
if (compareAndSet(false, true)) {
try {
actual.onError(t);
} finally {
resource.unsubscribe();
}
} else {
RxJavaHooks.onError(t);
}
}

@Override
public void setSubscription(Subscription s) {
resource.update(s);
}

@Override
public void setCancellation(Cancellable c) {
setSubscription(new CancellableSubscription(c));
}

@Override
public void unsubscribe() {
if (compareAndSet(false, true)) {
resource.unsubscribe();
}
}

@Override
public boolean isUnsubscribed() {
return get();
}

}
}
Loading