Skip to content

2.x: rename Observable and Single #doOnCancel to #doOnDispose #4458

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ public final Completable doOnComplete(Action onComplete) {
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnDispose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onDispose the callback to call when the child subscriber cancels the subscription
* @param onDispose the callback to call when the child subscriber disposes the subscription
* @return the new Completable instance
* @throws NullPointerException if onDispose is null
*/
Expand Down Expand Up @@ -995,7 +995,7 @@ public final Completable doOnError(Consumer<? super Throwable> onError) {
* @param onError the consumer called when this emits an onError event
* @param onComplete the runnable called just before when this Completable completes normally
* @param onAfterTerminate the runnable called after this Completable completes normally
* @param onDisposed the runnable called when the child cancels the subscription
* @param onDispose the runnable called when the child disposes the subscription
* @return the new Completable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
Expand All @@ -1005,14 +1005,14 @@ private Completable doOnLifecycle(
final Action onComplete,
final Action onTerminate,
final Action onAfterTerminate,
final Action onDisposed) {
final Action onDispose) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onTerminate, "onTerminate is null");
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
ObjectHelper.requireNonNull(onDisposed, "onDisposed is null");
return RxJavaPlugins.onAssembly(new CompletablePeek(this, onSubscribe, onError, onComplete, onTerminate, onAfterTerminate, onDisposed));
ObjectHelper.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new CompletablePeek(this, onSubscribe, onError, onComplete, onTerminate, onAfterTerminate, onDispose));
}

/**
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6294,17 +6294,17 @@ public final Observable<T> doAfterTerminate(Action onFinally) {
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doOnUnsubscribe.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnUnsubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* <dd>{@code doOnDispose} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param onCancel
* the action that gets called when the source {@code ObservableSource}'s Subscription is cancelled
* @param onDispose
* the action that gets called when the source {@code ObservableSource}'s Subscription is disposed
* @return the source {@code ObservableSource} modified so as to call this Action when appropriate
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnCancel(Action onCancel) {
return doOnLifecycle(Functions.emptyConsumer(), onCancel);
public final Observable<T> doOnDispose(Action onDispose) {
return doOnLifecycle(Functions.emptyConsumer(), onDispose);
}

/**
Expand Down Expand Up @@ -6437,16 +6437,16 @@ public final Observable<T> doOnError(Consumer<? super Throwable> onError) {
*
* @param onSubscribe
* a Consumer called with the Subscription sent via Subscriber.onSubscribe()
* @param onCancel
* called when the downstream cancels the Subscription via cancel()
* @param onDispose
* called when the downstream disposes the Subscription via dispose()
* @return the source ObservableSource with the side-effecting behavior applied
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onCancel) {
public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose) {
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
ObjectHelper.requireNonNull(onCancel, "onCancel is null");
return RxJavaPlugins.onAssembly(new ObservableDoOnLifecycle<T>(this, onSubscribe, onCancel));
ObjectHelper.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new ObservableDoOnLifecycle<T>(this, onSubscribe, onDispose));
}

/**
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1579,13 +1579,13 @@ public final Single<T> doOnError(final Consumer<? super Throwable> onError) {
* <dt><b>Scheduler:</b></dt>
* <dd>{@code doOnSubscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param onCancel the runnable called when the subscription is cancelled (disposed)
* @param onDispose the runnable called when the subscription is disposed
* @return the new Single instance
* @since 2.0
*/
public final Single<T> doOnCancel(final Action onCancel) {
ObjectHelper.requireNonNull(onCancel, "onCancel is null");
return RxJavaPlugins.onAssembly(new SingleDoOnCancel<T>(this, onCancel));
public final Single<T> doOnDispose(final Action onDispose) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should not we deprecate doOnCancel and then remove it with RC3?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the discussion #4451 came to a consensus that no need for going soft but delete what's no longer needed before releasing RC2, hence no deprecation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright works for me too

ObjectHelper.requireNonNull(onDispose, "onDispose is null");
return RxJavaPlugins.onAssembly(new SingleDoOnDispose<T>(this, onDispose));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ public final class CompletablePeek extends Completable {
final Action onComplete;
final Action onTerminate;
final Action onAfterTerminate;
final Action onDisposed;
final Action onDispose;

public CompletablePeek(CompletableSource source, Consumer<? super Disposable> onSubscribe,
Consumer<? super Throwable> onError,
Action onComplete,
Action onTerminate,
Action onAfterTerminate,
Action onDisposed) {
Action onDispose) {
this.source = source;
this.onSubscribe = onSubscribe;
this.onError = onError;
this.onComplete = onComplete;
this.onTerminate = onTerminate;
this.onAfterTerminate = onAfterTerminate;
this.onDisposed = onDisposed;
this.onDispose = onDispose;
}

@Override
Expand Down Expand Up @@ -107,7 +107,7 @@ public void onSubscribe(final Disposable d) {
@Override
public void run() {
try {
onDisposed.run();
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

public final class ObservableDoOnLifecycle<T> extends AbstractObservableWithUpstream<T, T> {
private final Consumer<? super Disposable> onSubscribe;
private final Action onCancel;
private final Action onDispose;

public ObservableDoOnLifecycle(Observable<T> upstream, Consumer<? super Disposable> onSubscribe,
Action onCancel) {
Action onDispose) {
super(upstream);
this.onSubscribe = onSubscribe;
this.onCancel = onCancel;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(Observer<? super T> observer) {
source.subscribe(new SubscriptionLambdaObserver<T>(observer, onSubscribe, onCancel));
source.subscribe(new SubscriptionLambdaObserver<T>(observer, onSubscribe, onDispose));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,38 @@
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.plugins.RxJavaPlugins;

public final class SingleDoOnCancel<T> extends Single<T> {
public final class SingleDoOnDispose<T> extends Single<T> {
final SingleSource<T> source;

final Action onCancel;
final Action onDispose;

public SingleDoOnCancel(SingleSource<T> source, Action onCancel) {
public SingleDoOnDispose(SingleSource<T> source, Action onDispose) {
this.source = source;
this.onCancel = onCancel;
this.onDispose = onDispose;
}

@Override
protected void subscribeActual(final SingleObserver<? super T> s) {

source.subscribe(new DoOnCancelObserver<T>(s, onCancel));
source.subscribe(new DoOnDisposeObserver<T>(s, onDispose));
}

static final class DoOnCancelObserver<T> implements SingleObserver<T>, Disposable {
static final class DoOnDisposeObserver<T> implements SingleObserver<T>, Disposable {
final SingleObserver<? super T> actual;

final Action onCancel;
final Action onDispose;

Disposable d;

public DoOnCancelObserver(SingleObserver<? super T> actual, Action onCancel) {
public DoOnDisposeObserver(SingleObserver<? super T> actual, Action onDispose) {
this.actual = actual;
this.onCancel = onCancel;
this.onDispose = onDispose;
}

@Override
public void dispose() {
try {
onCancel.run();
onDispose.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@
public final class SubscriptionLambdaObserver<T> implements Observer<T>, Disposable {
final Observer<? super T> actual;
final Consumer<? super Disposable> onSubscribe;
final Action onCancel;
final Action onDispose;

Disposable s;

public SubscriptionLambdaObserver(Observer<? super T> actual,
Consumer<? super Disposable> onSubscribe,
Action onCancel) {
Action onDispose) {
this.actual = actual;
this.onSubscribe = onSubscribe;
this.onCancel = onCancel;
this.onDispose = onDispose;
}

@Override
Expand Down Expand Up @@ -73,7 +73,7 @@ public void onComplete() {
@Override
public void dispose() {
try {
onCancel.run();
onDispose.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void accept(String v) {
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Observable<Integer> o = Observable.just(1).doOnCancel(unsubscribe).cache();
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
o.subscribe();
o.subscribe();
o.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void testDelaySubscription() {
}

@Test
public void testDelaySubscriptionCancelBeforeTime() {
public void testDelaySubscriptionDisposeBeforeTime() {
Observable<Integer> result = Observable.just(1, 2, 3).delaySubscription(100, TimeUnit.MILLISECONDS, scheduler);

Observer<Object> o = TestHelper.mockObserver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void testDoOnUnsubscribe() throws Exception {
// The stream needs to be infinite to ensure the stream does not terminate
// before it is unsubscribed
.interval(50, TimeUnit.MILLISECONDS)
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
// Test that upper stream will be notified for un-subscription
Expand All @@ -57,7 +57,7 @@ public void accept(Long aLong) {
onNextLatch.countDown();
}
})
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
// Test that lower stream will be notified for a direct un-subscription
Expand Down Expand Up @@ -103,7 +103,7 @@ public void testDoOnUnSubscribeWorksWithRefCount() throws Exception {
// The stream needs to be infinite to ensure the stream does not terminate
// before it is unsubscribed
.interval(50, TimeUnit.MILLISECONDS)
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
// Test that upper stream will be notified for un-subscription
Expand All @@ -118,7 +118,7 @@ public void accept(Long aLong) {
onNextLatch.countDown();
}
})
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
// Test that lower stream will be notified for un-subscription
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testErrorReceived() {
@Test
public void testUnsubscribesFromUpstream() {
final AtomicBoolean unsub = new AtomicBoolean();
Observable.range(1, 10).doOnCancel(new Action() {
Observable.range(1, 10).doOnDispose(new Action() {
@Override
public void run() {
unsub.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void accept(Integer t1) {
sourceEmission.incrementAndGet();
}
})
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
sourceUnsubscribed.set(true);
Expand All @@ -209,7 +209,7 @@ public void run() {
@Override
public void onNext(Integer t) {
if (valueCount() == 2) {
source.doOnCancel(new Action() {
source.doOnDispose(new Action() {
@Override
public void run() {
child2Unsubscribed.set(true);
Expand All @@ -220,7 +220,7 @@ public void run() {
}
};

source.doOnCancel(new Action() {
source.doOnDispose(new Action() {
@Override
public void run() {
child1Unsubscribed.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void accept(Disposable s) {
subscribeCount.incrementAndGet();
}
})
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
Expand Down Expand Up @@ -215,7 +215,7 @@ public void accept(Disposable s) {
subscribeLatch.countDown();
}
})
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testConnectUnsubscribeRaceConditionLoop() throws InterruptedExceptio
public void testConnectUnsubscribeRaceCondition() throws InterruptedException {
final AtomicInteger subUnsubCount = new AtomicInteger();
Observable<Long> o = synchronousInterval()
.doOnCancel(new Action() {
.doOnDispose(new Action() {
@Override
public void run() {
System.out.println("******************************* Unsubscribe received");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ public void testIssue2191_UnsubscribeSource() throws Exception {

ConnectableObservable<Integer> replay = source
.doOnNext(sourceNext)
.doOnCancel(sourceUnsubscribed)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.replay();

Expand Down Expand Up @@ -572,7 +572,7 @@ public void testIssue2191_SchedulerUnsubscribe() throws Exception {
// NbpObservable under test
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3)
.doOnNext(sourceNext)
.doOnCancel(sourceUnsubscribed)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.subscribeOn(mockScheduler).replay();

Expand Down Expand Up @@ -629,7 +629,7 @@ public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception {
when(mockFunc.apply(2)).thenThrow(illegalArgumentException);
ConnectableObservable<Integer> replay = Observable.just(1, 2, 3).map(mockFunc)
.doOnNext(sourceNext)
.doOnCancel(sourceUnsubscribed)
.doOnDispose(sourceUnsubscribed)
.doOnComplete(sourceCompleted)
.doOnError(sourceError)
.subscribeOn(mockScheduler).replay();
Expand Down Expand Up @@ -850,7 +850,7 @@ public void accept(String v) {
@Test
public void testUnsubscribeSource() throws Exception {
Action unsubscribe = mock(Action.class);
Observable<Integer> o = Observable.just(1).doOnCancel(unsubscribe).cache();
Observable<Integer> o = Observable.just(1).doOnDispose(unsubscribe).cache();
o.subscribe();
o.subscribe();
o.subscribe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void run() {
unsubscribed.set(true);
}
};
Observable.just(1).doOnCancel(unsubscribeAction)
Observable.just(1).doOnDispose(unsubscribeAction)
.takeLast(1).subscribe();
assertTrue(unsubscribed.get());
}
Expand Down
Loading