Skip to content

2.x: add subscribeWith to get back the subscriber/observer fluently #4422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,31 @@ public final void subscribe(CompletableObserver s) {
*/
protected abstract void subscribeActual(CompletableObserver s);

/**
* Subscribes a given CompletableObserver (subclass) to this Completable and returns the given
* CompletableObserver as is.
* <p>Usage example:
* <pre><code>
* Completable<Integer> source = Completable.complete().delay(1, TimeUnit.SECONDS);
* CompositeDisposable composite = new CompositeDisposable();
*
* class ResourceCompletableObserver implements CompletableObserver, Disposable {
* // ...
* }
*
* composite.add(source.subscribeWith(new ResourceCompletableObserver()));
* </code></pre>
* @param <E> the type of the CompletableObserver to use and return
* @param observer the CompletableObserver (subclass) to use and return, not null
* @return the input {@code observer}
* @throws NullPointerException if {@code observer} is null
* @since 2.0
*/
public final <E extends CompletableObserver> E subscribeWith(E observer) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will there also be an overload that only takes onComplete? Or something that takes 2 arguments, onComplete & onError?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the lines below.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ups 😕 thanks

subscribe(observer);
return observer;
}

/**
* Subscribes to this Completable and calls back either the onError or onComplete functions.
* <dl>
Expand Down
27 changes: 26 additions & 1 deletion src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -11709,7 +11709,32 @@ public final void subscribe(Subscriber<? super T> s) {
* @param s the incoming Subscriber, never null
*/
protected abstract void subscribeActual(Subscriber<? super T> s);


/**
* Subscribes a given Subscriber (subclass) to this Flowable and returns the given
* Subscriber as is.
* <p>Usage example:
* <pre><code>
* Flowable<Integer> source = Flowable.range(1, 10);
* CompositeDisposable composite = new CompositeDisposable();
*
* ResourceSubscriber&lt;Integer> rs = new ResourceSubscriber&lt;>() {
* // ...
* };
*
* composite.add(source.subscribeWith(rs));
* </code></pre>
* @param <E> the type of the Subscriber to use and return
* @param subscriber the Subscriber (subclass) to use and return, not null
* @return the input {@code subscriber}
* @throws NullPointerException if {@code subscriber} is null
* @since 2.0
*/
public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
subscribe(subscriber);
return subscriber;
}

/**
* Asynchronously subscribes Observers to this Publisher on the specified {@link Scheduler}.
* <p>
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -9886,6 +9886,31 @@ public final void subscribe(Observer<? super T> observer) {
*/
protected abstract void subscribeActual(Observer<? super T> observer);

/**
* Subscribes a given Observer (subclass) to this Observable and returns the given
* Observer as is.
* <p>Usage example:
* <pre><code>
* Observable<Integer> source = Observable.range(1, 10);
* CompositeDisposable composite = new CompositeDisposable();
*
* ResourceObserver&lt;Integer> rs = new ResourceSubscriber&lt;>() {
* // ...
* };
*
* composite.add(source.subscribeWith(rs));
* </code></pre>
* @param <E> the type of the Observer to use and return
* @param observer the Observer (subclass) to use and return, not null
* @return the input {@code observer}
* @throws NullPointerException if {@code observer} is null
* @since 2.0
*/
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}

/**
* Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
* <p>
Expand Down
27 changes: 26 additions & 1 deletion src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -2156,7 +2156,32 @@ public final void subscribe(SingleObserver<? super T> subscriber) {
* @param observer the SingleObserver to handle, not null
*/
protected abstract void subscribeActual(SingleObserver<? super T> observer);


/**
* Subscribes a given SingleObserver (subclass) to this Single and returns the given
* SingleObserver as is.
* <p>Usage example:
* <pre><code>
* Single<Integer> source = Single.just(1);
* CompositeDisposable composite = new CompositeDisposable();
*
* class ResourceSingleObserver implements SingleObserver&lt;Integer>, Disposable {
* // ...
* }
*
* composite.add(source.subscribeWith(new ResourceSingleObserver()));
* </code></pre>
* @param <E> the type of the SingleObserver to use and return
* @param observer the SingleObserver (subclass) to use and return, not null
* @return the input {@code observer}
* @throws NullPointerException if {@code observer} is null
* @since 2.0
*/
public final <E extends SingleObserver<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}

/**
* Asynchronously subscribes subscribers to this Single on the specified {@link Scheduler}.
* <p>
Expand Down
76 changes: 76 additions & 0 deletions src/test/java/io/reactivex/internal/SubscribeWithTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal;

import org.junit.Test;
import static org.junit.Assert.*;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.TestObserver;
import io.reactivex.subscribers.TestSubscriber;

public class SubscribeWithTest {

@Test
public void withFlowable() {
Flowable.range(1, 10)
.subscribeWith(new TestSubscriber<Integer>())
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}


@Test
public void withObservable() {
Observable.range(1, 10)
.subscribeWith(new TestObserver<Integer>())
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}


class ObserverImpl implements SingleObserver<Object>, CompletableObserver {
Object value;

@Override
public void onSubscribe(Disposable d) {

}

@Override
public void onComplete() {
this.value = 100;
}

@Override
public void onSuccess(Object value) {
this.value = value;
}

@Override
public void onError(Throwable e) {
this.value = e;
}
}

@Test
public void withSingle() {
assertEquals(1, Single.just(1).subscribeWith(new ObserverImpl()).value);
}

@Test
public void withCompletable() {
assertEquals(100, Completable.complete().subscribeWith(new ObserverImpl()).value);
}

}