Skip to content

2.x: cleanup based on IntelliJ 2017.1 inspections #5222

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 59 additions & 59 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/FlowableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ public interface FlowableEmitter<T> extends Emitter<T> {

/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* or Cancellation will be disposed/cancelled.
* @param s the disposable, null is allowed
*/
void setDisposable(@Nullable Disposable s);

/**
* Sets a Cancellable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
* or Cancellation will be disposed/cancelled.
* @param c the cancellable resource, null is allowed
*/
void setCancellable(@Nullable Cancellable c);
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ public static <T> Maybe<T> fromCallable(final Callable<? extends T> callable) {
* return value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
* <p>
* <em>Important note:</em> This Maybe is blocking; you cannot unsubscribe from it.
* <em>Important note:</em> This Maybe is blocking; you cannot dispose it.
* <p>
* Unlike 1.x, cancelling the Maybe won't cancel the future. If necessary, one can use composition to achieve the
* cancellation effect: {@code futureMaybe.doOnDispose(() -> future.cancel(true));}.
Expand Down Expand Up @@ -696,7 +696,7 @@ public static <T> Maybe<T> fromFuture(Future<? extends T> future) {
* Unlike 1.x, cancelling the Maybe won't cancel the future. If necessary, one can use composition to achieve the
* cancellation effect: {@code futureMaybe.doOnCancel(() -> future.cancel(true));}.
* <p>
* <em>Important note:</em> This Maybe is blocking on the thread it gets subscribed on; you cannot unsubscribe from it.
* <em>Important note:</em> This Maybe is blocking on the thread it gets subscribed on; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -2036,7 +2036,7 @@ public final T blockingGet(T defaultValue) {
* The operator subscribes only when the first downstream subscriber subscribes and maintains
* a single subscription towards this Maybe.
* <p>
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the {@code cache}.
* <em>Note:</em> You sacrifice the ability to dispose the origin when you use the {@code cache}.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down
106 changes: 53 additions & 53 deletions src/main/java/io/reactivex/Observable.java

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* size thread pool:
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Flowable.merge(workers), 2);
Expand All @@ -217,7 +217,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* subscription to the second.
*
* <pre>
* Scheduler limitSched = Schedulers.computation().when(workers -> {
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Flowables two at a time
* return Completable.merge(Flowable.merge(workers, 2));
Expand All @@ -230,7 +230,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
* bucket algorithm).
*
* <pre>
* Scheduler slowSched = Schedulers.computation().when(workers -> {
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
Expand All @@ -254,7 +254,7 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resource cleanup.
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
*/
public abstract static class Worker implements Disposable {
/**
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
* value of the {@link Future#get} method of that object, by passing the object into the {@code from}
* method.
* <p>
* <em>Important note:</em> This Single is blocking; you cannot unsubscribe from it.
* <em>Important note:</em> This Single is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -486,7 +486,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future) {
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <p>
* <em>Important note:</em> This {@code Single} is blocking; you cannot unsubscribe from it.
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
Expand Down Expand Up @@ -519,7 +519,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout,
* the return value of the {@link Future#get} method of that object, by passing the object into the
* {@code from} method.
* <p>
* <em>Important note:</em> This {@code Single} is blocking; you cannot unsubscribe from it.
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify the {@link Scheduler} where the blocking wait will happen.</dd>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/exceptions/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class CompositeException extends RuntimeException {
*/
public CompositeException(Throwable... exceptions) {
this(exceptions == null ?
Arrays.asList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions));
Collections.singletonList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions));
}

/**
Expand Down Expand Up @@ -129,7 +129,7 @@ public synchronized Throwable getCause() { // NOPMD
chain.initCause(e);
} catch (Throwable t) { // NOPMD
// ignore
// the javadocs say that some Throwables (depending on how they're made) will never
// the JavaDocs say that some Throwables (depending on how they're made) will never
// let me call initCause without blowing up even if it returns null
}
chain = getRootCause(chain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/**
* Override of the SimpleQueue interface with no throws Exception on poll().
*
* @param <T> the value type to enqueue and dequeue, not null
* @param <T> the value type to offer and poll, not null
*/
public interface SimplePlainQueue<T> extends SimpleQueue<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
/**
* A minimalist queue interface without the method bloat of java.util.Collection and java.util.Queue.
*
* @param <T> the value type to enqueue and dequeue, not null
* @param <T> the value type to offer and poll, not null
*/
public interface SimpleQueue<T> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void subscribeActual(final CompletableObserver s) {
sources = b;
}
sources[count++] = element;
};
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, s);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ public void removeChild(ReplaySubscription<T> p) {
ReplaySubscription<T>[] b;
if (n == 1) {
b = EMPTY;
return;
} else {
b = new ReplaySubscription[n - 1];
System.arraycopy(a, 0, b, 0, j);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ public void onError(Throwable e) {
if (decrementAndGet() == 0) {
Throwable ex = errors.terminate();
actual.onError(ex);
return;
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
s.request(1);
Expand All @@ -140,7 +139,6 @@ public void onError(Throwable e) {
if (getAndSet(0) > 0) {
Throwable ex = errors.terminate();
actual.onError(ex);
return;
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ public void onError(Throwable e) {
if (decrementAndGet() == 0) {
Throwable ex = errors.terminate();
actual.onError(ex);
return;
} else {
if (maxConcurrency != Integer.MAX_VALUE) {
s.request(1);
Expand All @@ -147,7 +146,6 @@ public void onError(Throwable e) {
if (getAndSet(0) > 0) {
Throwable ex = errors.terminate();
actual.onError(ex);
return;
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ void drain() {
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.cancel();
it = null;
ExceptionHelper.addThrowable(error, ex);
ex = ExceptionHelper.terminate(error);
a.onError(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ static final class WindowExactSubscriber<T>

UnicastProcessor<T> window;

boolean done;

WindowExactSubscriber(Subscriber<? super Flowable<T>> actual, long size, int bufferSize) {
super(1);
this.actual = actual;
Expand All @@ -92,10 +90,6 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
if (done) {
return;
}

long i = index;

UnicastProcessor<T> w = window;
Expand Down Expand Up @@ -123,10 +117,6 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
Processor<T, T> w = window;
if (w != null) {
window = null;
Expand All @@ -138,10 +128,6 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
return;
}

Processor<T, T> w = window;
if (w != null) {
window = null;
Expand Down Expand Up @@ -199,8 +185,6 @@ static final class WindowSkipSubscriber<T>

UnicastProcessor<T> window;

boolean done;

WindowSkipSubscriber(Subscriber<? super Flowable<T>> actual, long size, long skip, int bufferSize) {
super(1);
this.actual = actual;
Expand All @@ -221,10 +205,6 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
if (done) {
return;
}

long i = index;

UnicastProcessor<T> w = window;
Expand Down Expand Up @@ -258,10 +238,6 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
Processor<T, T> w = window;
if (w != null) {
window = null;
Expand All @@ -273,10 +249,6 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
return;
}

Processor<T, T> w = window;
if (w != null) {
window = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,13 @@ void drainLoop() {
worker.schedule(new Completion(w), timespan, unit);
} else {
a.onError(new MissingBackpressureException("Can't emit window due to lack of requests"));
continue;
}
} else {
ws.remove(work.w);
work.w.onComplete();
if (ws.isEmpty() && cancelled) {
terminated = true;
}
continue;
}
} else {
for (UnicastProcessor<T> w : ws) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ static final class ZipCoordinator<T, R>

final boolean delayErrors;

volatile boolean done;

volatile boolean cancelled;

final Object[] current;
Expand All @@ -112,7 +110,7 @@ static final class ZipCoordinator<T, R>
@SuppressWarnings("unchecked")
ZipSubscriber<T, R>[] a = new ZipSubscriber[n];
for (int i = 0; i < n; i++) {
a[i] = new ZipSubscriber<T, R>(this, prefetch, i);
a[i] = new ZipSubscriber<T, R>(this, prefetch);
}
this.current = new Object[n];
this.subscribers = a;
Expand All @@ -123,7 +121,7 @@ static final class ZipCoordinator<T, R>
void subscribe(Publisher<? extends T>[] sources, int n) {
ZipSubscriber<T, R>[] a = subscribers;
for (int i = 0; i < n; i++) {
if (done || cancelled || (!delayErrors && errors.get() != null)) {
if (cancelled || (!delayErrors && errors.get() != null)) {
return;
}
sources[i].subscribe(a[i]);
Expand Down Expand Up @@ -333,8 +331,6 @@ static final class ZipSubscriber<T, R> extends AtomicReference<Subscription> imp

final int limit;

final int index;

SimpleQueue<T> queue;

long produced;
Expand All @@ -343,10 +339,9 @@ static final class ZipSubscriber<T, R> extends AtomicReference<Subscription> imp

int sourceMode;

ZipSubscriber(ZipCoordinator<T, R> parent, int prefetch, int index) {
ZipSubscriber(ZipCoordinator<T, R> parent, int prefetch) {
this.parent = parent;
this.prefetch = prefetch;
this.index = index;
this.limit = prefetch - (prefetch >> 2);
}

Expand Down
Loading