Skip to content

2.x: implement ops, add javadoc, remove unused components 8/19-2 #4378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 19, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ dependencies {
testCompile 'junit:junit:4.12'
testCompile 'org.mockito:mockito-core:1.10.19'

perfCompile 'org.openjdk.jmh:jmh-core:1.11.3'
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.11.3'
perfCompile 'org.openjdk.jmh:jmh-core:1.13'
perfCompile 'org.openjdk.jmh:jmh-generator-annprocess:1.13'
}

javadoc {
exclude "**/rx/internal/**"
exclude "**/internal/**"
exclude "**/test/**"
exclude "**/perf/**"
options {
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/reactivex/BackpressureStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,21 @@

package io.reactivex;

/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* Buffer all values (unbounded) until there is a downstream demand for it.
*/
BUFFER,
/**
* Drop the value if there is no current demand for it from the downstream.
*/
DROP,
/**
* Have a latest value always available and overwrite it with more recent ones
* if there is no demand for it from the downstream.
*/
LATEST
}
41 changes: 2 additions & 39 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1430,25 +1430,6 @@ public final Disposable subscribe(final Action onComplete, final Consumer<? supe
return s;
}

/**
* Subscribes a non-backpressure Observer to this Completable instance which
* will receive only an onError or onComplete event.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the Observer's value type
* @param observer the Observer instance, not null
* @throws NullPointerException if s is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> void subscribe(final Observer<? super T> observer) {
Objects.requireNonNull(observer, "s is null");

ObserverCompletableObserver<T> os = new ObserverCompletableObserver<T>(observer);
subscribe(os);
}

/**
* Subscribes to this Completable and calls the given Action when this Completable
* completes normally.
Expand All @@ -1470,24 +1451,6 @@ public final Disposable subscribe(final Action onComplete) {
return s;
}

/**
* Subscribes a reactive-streams Subscriber to this Completable instance which
* will receive only an onError or onComplete event.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type of the subscriber
* @param s the reactive-streams Subscriber, not null
* @throws NullPointerException if s is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> void subscribe(Subscriber<T> s) {
Objects.requireNonNull(s, "s is null");
SubscriberCompletableObserver<T> os = new SubscriberCompletableObserver<T>(s);
subscribe(os);
}

/**
* Returns a Completable which subscribes the child subscriber on the specified scheduler, making
* sure the subscription side-effects happen on that specific thread of the scheduler.
Expand Down Expand Up @@ -1718,7 +1681,7 @@ public final Completable unsubscribeOn(final Scheduler scheduler) {
*/
public final TestSubscriber<Void> test() {
TestSubscriber<Void> ts = new TestSubscriber<Void>();
subscribe(ts);
subscribe(new SubscriberCompletableObserver<Void>(ts));
return ts;
}

Expand All @@ -1732,7 +1695,7 @@ public final TestSubscriber<Void> test() {
public final TestSubscriber<Void> test(boolean cancelled) {
TestSubscriber<Void> ts = new TestSubscriber<Void>();
ts.dispose();
subscribe(ts);
subscribe(new SubscriberCompletableObserver<Void>(ts));
return ts;
}
}
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/FlowableTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import io.reactivex.functions.Function;

/**
* Interface to compose observables.
* Interface to compose Flowables.
*
* @param <T> the upstream value type
* @param <R> the downstream value type
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface FlowableTransformer<T, R> extends Function<Flowable<T>, Publisher<? extends R>> {
public interface FlowableTransformer<Upstream, Downstream> extends Function<Flowable<Upstream>, Publisher<? extends Downstream>> {

}
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/ObservableOperator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

import io.reactivex.functions.Function;

/**
* Interface to map/wrap a downstream subscriber to an upstream Observer.
*
* @param <Downstream> the value type of the downstream
* @param <Upstream> the value type of the upstream
*/
public interface ObservableOperator<Downstream, Upstream> extends Function<Observer<? super Downstream>, Observer<? super Upstream>> {

}
6 changes: 6 additions & 0 deletions src/main/java/io/reactivex/ObservableTransformer.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@

import io.reactivex.functions.Function;

/**
* Interface to compose Observables.
*
* @param <Upstream> the upstream value type
* @param <Downstream> the downstream value type
*/
public interface ObservableTransformer<Upstream, Downstream> extends Function<Observable<Upstream>, ObservableSource<Downstream>> {

}
Loading