Skip to content

Commit 0a07ac1

Browse files
authored
2.x: cleanup based on IntelliJ 2017.1 inspections (#5222)
* 2.x: cleanup based on IntelliJ 2017.1 inspections * Restore indexed for-loop and make other loops indexed as well.
1 parent 3f6e570 commit 0a07ac1

40 files changed

+223
-279
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 59 additions & 59 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/FlowableEmitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ public interface FlowableEmitter<T> extends Emitter<T> {
3333

3434
/**
3535
* Sets a Disposable on this emitter; any previous Disposable
36-
* or Cancellation will be unsubscribed/cancelled.
36+
* or Cancellation will be disposed/cancelled.
3737
* @param s the disposable, null is allowed
3838
*/
3939
void setDisposable(@Nullable Disposable s);
4040

4141
/**
4242
* Sets a Cancellable on this emitter; any previous Disposable
43-
* or Cancellation will be unsubscribed/cancelled.
43+
* or Cancellation will be disposed/cancelled.
4444
* @param c the cancellable resource, null is allowed
4545
*/
4646
void setCancellable(@Nullable Cancellable c);

src/main/java/io/reactivex/Maybe.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,7 @@ public static <T> Maybe<T> fromCallable(final Callable<? extends T> callable) {
660660
* return value of the {@link Future#get} method of that object, by passing the object into the {@code from}
661661
* method.
662662
* <p>
663-
* <em>Important note:</em> This Maybe is blocking; you cannot unsubscribe from it.
663+
* <em>Important note:</em> This Maybe is blocking; you cannot dispose it.
664664
* <p>
665665
* Unlike 1.x, cancelling the Maybe won't cancel the future. If necessary, one can use composition to achieve the
666666
* cancellation effect: {@code futureMaybe.doOnDispose(() -> future.cancel(true));}.
@@ -696,7 +696,7 @@ public static <T> Maybe<T> fromFuture(Future<? extends T> future) {
696696
* Unlike 1.x, cancelling the Maybe won't cancel the future. If necessary, one can use composition to achieve the
697697
* cancellation effect: {@code futureMaybe.doOnCancel(() -> future.cancel(true));}.
698698
* <p>
699-
* <em>Important note:</em> This Maybe is blocking on the thread it gets subscribed on; you cannot unsubscribe from it.
699+
* <em>Important note:</em> This Maybe is blocking on the thread it gets subscribed on; you cannot dispose it.
700700
* <dl>
701701
* <dt><b>Scheduler:</b></dt>
702702
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -2036,7 +2036,7 @@ public final T blockingGet(T defaultValue) {
20362036
* The operator subscribes only when the first downstream subscriber subscribes and maintains
20372037
* a single subscription towards this Maybe.
20382038
* <p>
2039-
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use the {@code cache}.
2039+
* <em>Note:</em> You sacrifice the ability to dispose the origin when you use the {@code cache}.
20402040
* <dl>
20412041
* <dt><b>Scheduler:</b></dt>
20422042
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/io/reactivex/Observable.java

Lines changed: 53 additions & 53 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Scheduler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
199199
* size thread pool:
200200
*
201201
* <pre>
202-
* Scheduler limitSched = Schedulers.computation().when(workers -> {
202+
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
203203
* // use merge max concurrent to limit the number of concurrent
204204
* // callbacks two at a time
205205
* return Completable.merge(Flowable.merge(workers), 2);
@@ -217,7 +217,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
217217
* subscription to the second.
218218
*
219219
* <pre>
220-
* Scheduler limitSched = Schedulers.computation().when(workers -> {
220+
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
221221
* // use merge max concurrent to limit the number of concurrent
222222
* // Flowables two at a time
223223
* return Completable.merge(Flowable.merge(workers, 2));
@@ -230,7 +230,7 @@ public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initial
230230
* bucket algorithm).
231231
*
232232
* <pre>
233-
* Scheduler slowSched = Schedulers.computation().when(workers -> {
233+
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
234234
* // use concatenate to make each worker happen one at a time.
235235
* return Completable.concat(workers.map(actions -> {
236236
* // delay the starting of the next worker by 1 second.
@@ -254,7 +254,7 @@ public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flow
254254
/**
255255
* Sequential Scheduler for executing actions on a single thread or event loop.
256256
* <p>
257-
* Unsubscribing the {@link Worker} cancels all outstanding work and allows resource cleanup.
257+
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
258258
*/
259259
public abstract static class Worker implements Disposable {
260260
/**

src/main/java/io/reactivex/Single.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
457457
* value of the {@link Future#get} method of that object, by passing the object into the {@code from}
458458
* method.
459459
* <p>
460-
* <em>Important note:</em> This Single is blocking; you cannot unsubscribe from it.
460+
* <em>Important note:</em> This Single is blocking; you cannot dispose it.
461461
* <dl>
462462
* <dt><b>Scheduler:</b></dt>
463463
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -486,7 +486,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future) {
486486
* the return value of the {@link Future#get} method of that object, by passing the object into the
487487
* {@code from} method.
488488
* <p>
489-
* <em>Important note:</em> This {@code Single} is blocking; you cannot unsubscribe from it.
489+
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
490490
* <dl>
491491
* <dt><b>Scheduler:</b></dt>
492492
* <dd>{@code fromFuture} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -519,7 +519,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout,
519519
* the return value of the {@link Future#get} method of that object, by passing the object into the
520520
* {@code from} method.
521521
* <p>
522-
* <em>Important note:</em> This {@code Single} is blocking; you cannot unsubscribe from it.
522+
* <em>Important note:</em> This {@code Single} is blocking; you cannot dispose it.
523523
* <dl>
524524
* <dt><b>Scheduler:</b></dt>
525525
* <dd>You specify the {@link Scheduler} where the blocking wait will happen.</dd>

src/main/java/io/reactivex/exceptions/CompositeException.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public final class CompositeException extends RuntimeException {
4949
*/
5050
public CompositeException(Throwable... exceptions) {
5151
this(exceptions == null ?
52-
Arrays.asList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions));
52+
Collections.singletonList(new NullPointerException("exceptions was null")) : Arrays.asList(exceptions));
5353
}
5454

5555
/**
@@ -129,7 +129,7 @@ public synchronized Throwable getCause() { // NOPMD
129129
chain.initCause(e);
130130
} catch (Throwable t) { // NOPMD
131131
// ignore
132-
// the javadocs say that some Throwables (depending on how they're made) will never
132+
// the JavaDocs say that some Throwables (depending on how they're made) will never
133133
// let me call initCause without blowing up even if it returns null
134134
}
135135
chain = getRootCause(chain);

src/main/java/io/reactivex/internal/fuseable/SimplePlainQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
/**
1919
* Override of the SimpleQueue interface with no throws Exception on poll().
2020
*
21-
* @param <T> the value type to enqueue and dequeue, not null
21+
* @param <T> the value type to offer and poll, not null
2222
*/
2323
public interface SimplePlainQueue<T> extends SimpleQueue<T> {
2424

src/main/java/io/reactivex/internal/fuseable/SimpleQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
/**
1919
* A minimalist queue interface without the method bloat of java.util.Collection and java.util.Queue.
2020
*
21-
* @param <T> the value type to enqueue and dequeue, not null
21+
* @param <T> the value type to offer and poll, not null
2222
*/
2323
public interface SimpleQueue<T> {
2424

src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void subscribeActual(final CompletableObserver s) {
4848
sources = b;
4949
}
5050
sources[count++] = element;
51-
};
51+
}
5252
} catch (Throwable e) {
5353
Exceptions.throwIfFatal(e);
5454
EmptyDisposable.error(e, s);

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ public void removeChild(ReplaySubscription<T> p) {
167167
ReplaySubscription<T>[] b;
168168
if (n == 1) {
169169
b = EMPTY;
170-
return;
171170
} else {
172171
b = new ReplaySubscription[n - 1];
173172
System.arraycopy(a, 0, b, 0, j);

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletable.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ public void onError(Throwable e) {
129129
if (decrementAndGet() == 0) {
130130
Throwable ex = errors.terminate();
131131
actual.onError(ex);
132-
return;
133132
} else {
134133
if (maxConcurrency != Integer.MAX_VALUE) {
135134
s.request(1);
@@ -140,7 +139,6 @@ public void onError(Throwable e) {
140139
if (getAndSet(0) > 0) {
141140
Throwable ex = errors.terminate();
142141
actual.onError(ex);
143-
return;
144142
}
145143
}
146144
} else {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,6 @@ public void onError(Throwable e) {
136136
if (decrementAndGet() == 0) {
137137
Throwable ex = errors.terminate();
138138
actual.onError(ex);
139-
return;
140139
} else {
141140
if (maxConcurrency != Integer.MAX_VALUE) {
142141
s.request(1);
@@ -147,7 +146,6 @@ public void onError(Throwable e) {
147146
if (getAndSet(0) > 0) {
148147
Throwable ex = errors.terminate();
149148
actual.onError(ex);
150-
return;
151149
}
152150
}
153151
} else {

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlattenIterable.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,6 @@ void drain() {
270270
} catch (Throwable ex) {
271271
Exceptions.throwIfFatal(ex);
272272
s.cancel();
273-
it = null;
274273
ExceptionHelper.addThrowable(error, ex);
275274
ex = ExceptionHelper.terminate(error);
276275
a.onError(ex);

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindow.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,6 @@ static final class WindowExactSubscriber<T>
7272

7373
UnicastProcessor<T> window;
7474

75-
boolean done;
76-
7775
WindowExactSubscriber(Subscriber<? super Flowable<T>> actual, long size, int bufferSize) {
7876
super(1);
7977
this.actual = actual;
@@ -92,10 +90,6 @@ public void onSubscribe(Subscription s) {
9290

9391
@Override
9492
public void onNext(T t) {
95-
if (done) {
96-
return;
97-
}
98-
9993
long i = index;
10094

10195
UnicastProcessor<T> w = window;
@@ -123,10 +117,6 @@ public void onNext(T t) {
123117

124118
@Override
125119
public void onError(Throwable t) {
126-
if (done) {
127-
RxJavaPlugins.onError(t);
128-
return;
129-
}
130120
Processor<T, T> w = window;
131121
if (w != null) {
132122
window = null;
@@ -138,10 +128,6 @@ public void onError(Throwable t) {
138128

139129
@Override
140130
public void onComplete() {
141-
if (done) {
142-
return;
143-
}
144-
145131
Processor<T, T> w = window;
146132
if (w != null) {
147133
window = null;
@@ -199,8 +185,6 @@ static final class WindowSkipSubscriber<T>
199185

200186
UnicastProcessor<T> window;
201187

202-
boolean done;
203-
204188
WindowSkipSubscriber(Subscriber<? super Flowable<T>> actual, long size, long skip, int bufferSize) {
205189
super(1);
206190
this.actual = actual;
@@ -221,10 +205,6 @@ public void onSubscribe(Subscription s) {
221205

222206
@Override
223207
public void onNext(T t) {
224-
if (done) {
225-
return;
226-
}
227-
228208
long i = index;
229209

230210
UnicastProcessor<T> w = window;
@@ -258,10 +238,6 @@ public void onNext(T t) {
258238

259239
@Override
260240
public void onError(Throwable t) {
261-
if (done) {
262-
RxJavaPlugins.onError(t);
263-
return;
264-
}
265241
Processor<T, T> w = window;
266242
if (w != null) {
267243
window = null;
@@ -273,10 +249,6 @@ public void onError(Throwable t) {
273249

274250
@Override
275251
public void onComplete() {
276-
if (done) {
277-
return;
278-
}
279-
280252
Processor<T, T> w = window;
281253
if (w != null) {
282254
window = null;

src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -788,15 +788,13 @@ void drainLoop() {
788788
worker.schedule(new Completion(w), timespan, unit);
789789
} else {
790790
a.onError(new MissingBackpressureException("Can't emit window due to lack of requests"));
791-
continue;
792791
}
793792
} else {
794793
ws.remove(work.w);
795794
work.w.onComplete();
796795
if (ws.isEmpty() && cancelled) {
797796
terminated = true;
798797
}
799-
continue;
800798
}
801799
} else {
802800
for (UnicastProcessor<T> w : ws) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableZip.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ static final class ZipCoordinator<T, R>
9898

9999
final boolean delayErrors;
100100

101-
volatile boolean done;
102-
103101
volatile boolean cancelled;
104102

105103
final Object[] current;
@@ -112,7 +110,7 @@ static final class ZipCoordinator<T, R>
112110
@SuppressWarnings("unchecked")
113111
ZipSubscriber<T, R>[] a = new ZipSubscriber[n];
114112
for (int i = 0; i < n; i++) {
115-
a[i] = new ZipSubscriber<T, R>(this, prefetch, i);
113+
a[i] = new ZipSubscriber<T, R>(this, prefetch);
116114
}
117115
this.current = new Object[n];
118116
this.subscribers = a;
@@ -123,7 +121,7 @@ static final class ZipCoordinator<T, R>
123121
void subscribe(Publisher<? extends T>[] sources, int n) {
124122
ZipSubscriber<T, R>[] a = subscribers;
125123
for (int i = 0; i < n; i++) {
126-
if (done || cancelled || (!delayErrors && errors.get() != null)) {
124+
if (cancelled || (!delayErrors && errors.get() != null)) {
127125
return;
128126
}
129127
sources[i].subscribe(a[i]);
@@ -333,8 +331,6 @@ static final class ZipSubscriber<T, R> extends AtomicReference<Subscription> imp
333331

334332
final int limit;
335333

336-
final int index;
337-
338334
SimpleQueue<T> queue;
339335

340336
long produced;
@@ -343,10 +339,9 @@ static final class ZipSubscriber<T, R> extends AtomicReference<Subscription> imp
343339

344340
int sourceMode;
345341

346-
ZipSubscriber(ZipCoordinator<T, R> parent, int prefetch, int index) {
342+
ZipSubscriber(ZipCoordinator<T, R> parent, int prefetch) {
347343
this.parent = parent;
348344
this.prefetch = prefetch;
349-
this.index = index;
350345
this.limit = prefetch - (prefetch >> 2);
351346
}
352347

0 commit comments

Comments
 (0)