Skip to content

2.x: cleanup for text and javadoc 04/15 #5286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3566,10 +3566,10 @@ public final Maybe<T> retryUntil(final BooleanSupplier stop) {
* This retries 3 times, each time incrementing the number of seconds it waits.
*
* <pre><code>
* Publisher.create((Subscriber<? super String> s) -> {
* Flowable.create((FlowableEmitter<? super String> s) -> {
* System.out.println("subscribing");
* s.onError(new RuntimeException("always fails"));
* }).retryWhen(attempts -> {
* }, BackpressureStrategy.BUFFER).retryWhen(attempts -> {
* return attempts.zipWith(Publisher.range(1, 3), (n, i) -> i).flatMap(i -> {
* System.out.println("delay retry by " + i + " second(s)");
* return Publisher.timer(i, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.reactivex.annotations.Nullable;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.*;
import io.reactivex.internal.queue.SpscArrayQueue;
import io.reactivex.internal.subscriptions.*;
Expand Down Expand Up @@ -298,7 +299,7 @@ void drain() {
R v;

try {
v = it.next();
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
current = null;
Expand Down Expand Up @@ -437,7 +438,7 @@ public R poll() throws Exception {
current = it;
}

R r = it.next();
R r = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");

if (!it.hasNext()) {
current = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ private ObservableBlockingSubscribe() {
/**
* Subscribes to the source and calls the Observer methods on the current thread.
* <p>
* @param o the source publisher
* @param o the source ObservableSource
* The call to dispose() is composed through.
* @param observer the subscriber to forward events and calls to in the current thread
* @param <T> the value type
Expand Down Expand Up @@ -70,7 +70,7 @@ public static <T> void subscribe(ObservableSource<? extends T> o, Observer<? sup

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
* @param o the source publisher
* @param o the source ObservableSource
* @param <T> the value type
*/
public static <T> void subscribe(ObservableSource<? extends T> o) {
Expand All @@ -89,7 +89,7 @@ public static <T> void subscribe(ObservableSource<? extends T> o) {

/**
* Subscribes to the source and calls the given actions on the current thread.
* @param o the source publisher
* @param o the source ObservableSource
* @param onNext the callback action for each source value
* @param onError the callback action for an error event
* @param onComplete the callback action for the completion event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void onSubscribe(Disposable s) {
ObservableSource<B> boundary;

try {
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null");
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary ObservableSource supplied is null");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
Expand Down Expand Up @@ -179,7 +179,7 @@ void next() {
ObservableSource<B> boundary;

try {
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary publisher supplied is null");
boundary = ObjectHelper.requireNonNull(boundarySupplier.call(), "The boundary ObservableSource supplied is null");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
cancelled = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void onNext(T t) {
ObservableSource<U> p;

try {
p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The publisher supplied is null");
p = ObjectHelper.requireNonNull(debounceSelector.apply(t), "The ObservableSource supplied is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public ObservableDefer(Callable<? extends ObservableSource<? extends T>> supplie
public void subscribeActual(Observer<? super T> s) {
ObservableSource<? extends T> pub;
try {
pub = ObjectHelper.requireNonNull(supplier.call(), "null publisher supplied");
pub = ObjectHelper.requireNonNull(supplier.call(), "null ObservableSource supplied");
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
EmptyDisposable.error(t, s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void onNext(T t) {
ObservableSource<? extends R> p;

try {
p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext publisher returned is null");
p = ObjectHelper.requireNonNull(onNextMapper.apply(t), "The onNext ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
Expand All @@ -103,7 +103,7 @@ public void onError(Throwable t) {
ObservableSource<? extends R> p;

try {
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null");
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
Expand All @@ -119,7 +119,7 @@ public void onComplete() {
ObservableSource<? extends R> p;

try {
p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete publisher returned is null");
p = ObjectHelper.requireNonNull(onCompleteSupplier.call(), "The onComplete ObservableSource returned is null");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
actual.onError(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.reactivex.observables;

import io.reactivex.annotations.NonNull;
import org.reactivestreams.Subscriber;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
Expand All @@ -25,9 +24,9 @@
import io.reactivex.plugins.RxJavaPlugins;

/**
* A {@code ConnectableObservable} resembles an ordinary {@link Flowable}, except that it does not begin
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
* emitting items when it is subscribed to, but only when its {@link #connect} method is called. In this way you
* can wait for all intended {@link Subscriber}s to {@link Flowable#subscribe} to the {@code Observable}
* can wait for all intended {@link Observer}s to {@link Observable#subscribe} to the {@code Observable}
* before the {@code Observable} begins emitting items.
* <p>
* <img width="640" height="510" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/publishConnect.png" alt="">
Expand All @@ -41,7 +40,7 @@ public abstract class ConnectableObservable<T> extends Observable<T> {

/**
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
* {@link Observable} to its {@link Observer}s.
*
* @param connection
* the action that receives the connection subscription before the subscription to source happens
Expand All @@ -52,7 +51,7 @@ public abstract class ConnectableObservable<T> extends Observable<T> {

/**
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
* {@link Flowable} to its {@link Subscriber}s.
* {@link Observable} to its {@link Observer}s.
* <p>
* To disconnect from a synchronous source, use the {@link #connect(Consumer)} method.
*
Expand All @@ -79,18 +78,18 @@ public Observable<T> refCount() {

/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the first Subscriber subscribes.
* when the first Observer subscribes.
*
* @return an Observable that automatically connects to this ConnectableObservable
* when the first Subscriber subscribes
* when the first Observer subscribes
*/
@NonNull
public Observable<T> autoConnect() {
return autoConnect(1);
}
/**
* Returns an Observable that automatically connects to this ConnectableObservable
* when the specified number of Subscribers subscribe to it.
* when the specified number of Observers subscribe to it.
*
* @param numberOfSubscribers the number of subscribers to await before calling connect
* on the ConnectableObservable. A non-positive value indicates
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/subjects/SerializedSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.reactivex.plugins.RxJavaPlugins;

/**
* Serializes calls to the Subscriber methods.
* <p>All other Publisher and Subject methods are thread-safe by design.
* Serializes calls to the Observer methods.
* <p>All other Observable and Subject methods are thread-safe by design.
*
* @param <T> the item value type
*/
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/reactivex/single/SingleSubscribeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public void errorIsDisposed() {
@Test
public void biConsumerIsDisposedOnSuccess() {
final Object[] result = { null, null };

Disposable d = Single.just(1)
.subscribe(new BiConsumer<Integer, Throwable>() {
@Override
Expand All @@ -241,7 +241,7 @@ public void accept(Integer t1, Throwable t2) throws Exception {
result[1] = t2;
}
});

assertTrue("Not disposed?!", d.isDisposed());
assertEquals(1, result[0]);
assertNull(result[1]);
Expand All @@ -250,7 +250,7 @@ public void accept(Integer t1, Throwable t2) throws Exception {
@Test
public void biConsumerIsDisposedOnError() {
final Object[] result = { null, null };

Disposable d = Single.<Integer>error(new IOException())
.subscribe(new BiConsumer<Integer, Throwable>() {
@Override
Expand All @@ -259,7 +259,7 @@ public void accept(Integer t1, Throwable t2) throws Exception {
result[1] = t2;
}
});

assertTrue("Not disposed?!", d.isDisposed());
assertNull(result[0]);
assertTrue("" + result[1], result[1] instanceof IOException);
Expand Down