Skip to content

Commit 0094304

Browse files
authored
2.x Improve coverage, fix bugs (#4430)
* 2.x Improve coverage, fix bugs * Update to ExceptionHelper.wrapOrThrow
1 parent 6dbeff4 commit 0094304

File tree

60 files changed

+3430
-211
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

60 files changed

+3430
-211
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
import java.util.concurrent.*;
1616

17-
import org.reactivestreams.*;
17+
import org.reactivestreams.Publisher;
1818

1919
import io.reactivex.annotations.SchedulerSupport;
2020
import io.reactivex.disposables.Disposable;
@@ -24,8 +24,9 @@
2424
import io.reactivex.internal.operators.completable.*;
2525
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
2626
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
27-
import io.reactivex.internal.operators.single.*;
27+
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
2828
import io.reactivex.internal.subscribers.completable.*;
29+
import io.reactivex.internal.util.ExceptionHelper;
2930
import io.reactivex.plugins.RxJavaPlugins;
3031
import io.reactivex.schedulers.Schedulers;
3132
import io.reactivex.subscribers.TestSubscriber;
@@ -197,7 +198,6 @@ public static Completable concat(Publisher<? extends CompletableSource> sources,
197198
* @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable}
198199
* @return the new Completable instance
199200
* @see FlowableOnSubscribe
200-
* @see FlowableEmitter.BackpressureMode
201201
* @see Cancellable
202202
*/
203203
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1612,7 +1612,7 @@ public final <U> U to(Function<? super Completable, U> converter) {
16121612
return converter.apply(this);
16131613
} catch (Throwable ex) {
16141614
Exceptions.throwIfFatal(ex);
1615-
throw Exceptions.propagate(ex);
1615+
throw ExceptionHelper.wrapOrThrow(ex);
16161616
}
16171617
}
16181618

src/main/java/io/reactivex/Flowable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4848,7 +4848,7 @@ public final void blockingForEach(Consumer<? super T> onNext) {
48484848
} catch (Throwable e) {
48494849
Exceptions.throwIfFatal(e);
48504850
((Disposable)it).dispose();
4851-
throw Exceptions.propagate(e);
4851+
throw ExceptionHelper.wrapOrThrow(e);
48524852
}
48534853
}
48544854
}
@@ -13181,7 +13181,7 @@ public final <R> R to(Function<? super Flowable<T>, R> converter) {
1318113181
return converter.apply(this);
1318213182
} catch (Throwable ex) {
1318313183
Exceptions.throwIfFatal(ex);
13184-
throw Exceptions.propagate(ex);
13184+
throw ExceptionHelper.wrapOrThrow(ex);
1318513185
}
1318613186
}
1318713187

src/main/java/io/reactivex/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4346,7 +4346,7 @@ public final void blockingForEach(Consumer<? super T> onNext) {
43464346
} catch (Throwable e) {
43474347
Exceptions.throwIfFatal(e);
43484348
((Disposable)it).dispose();
4349-
throw Exceptions.propagate(e);
4349+
throw ExceptionHelper.wrapOrThrow(e);
43504350
}
43514351
}
43524352
}
@@ -11154,7 +11154,7 @@ public final <R> R to(Function<? super Observable<T>, R> converter) {
1115411154
return converter.apply(this);
1115511155
} catch (Throwable ex) {
1115611156
Exceptions.throwIfFatal(ex);
11157-
throw Exceptions.propagate(ex);
11157+
throw ExceptionHelper.wrapOrThrow(ex);
1115811158
}
1115911159
}
1116011160

src/main/java/io/reactivex/Scheduler.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.disposables.Disposable;
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.internal.disposables.SequentialDisposable;
21+
import io.reactivex.internal.util.ExceptionHelper;
2122
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
/**
@@ -37,6 +38,16 @@ public abstract class Scheduler {
3738
Long.getLong("rx2.scheduler.drift-tolerance", 15));
3839
}
3940

41+
/**
42+
* Returns the clock drift tolerance in nanoseconds.
43+
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes
44+
* @return the tolerance in nanoseconds
45+
* @since 2.0
46+
*/
47+
public static long clockDriftTolerance() {
48+
return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
49+
}
50+
4051

4152
/**
4253
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
@@ -294,7 +305,7 @@ public void run() {
294305
} catch (Throwable ex) {
295306
Exceptions.throwIfFatal(ex);
296307
worker.dispose();
297-
throw Exceptions.propagate(ex);
308+
throw ExceptionHelper.wrapOrThrow(ex);
298309
}
299310
}
300311
}

src/main/java/io/reactivex/Single.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.reactivex.internal.operators.flowable.*;
2626
import io.reactivex.internal.operators.single.*;
2727
import io.reactivex.internal.subscribers.single.*;
28-
import io.reactivex.internal.util.ErrorMode;
28+
import io.reactivex.internal.util.*;
2929
import io.reactivex.plugins.RxJavaPlugins;
3030
import io.reactivex.schedulers.Schedulers;
3131
import io.reactivex.subscribers.TestSubscriber;
@@ -249,7 +249,6 @@ public static <T> Flowable<T> concat(
249249
* @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable}
250250
* @return the new Single instance
251251
* @see FlowableOnSubscribe
252-
* @see FlowableEmitter.BackpressureMode
253252
* @see Cancellable
254253
*/
255254
public static <T> Single<T> create(SingleOnSubscribe<T> source) {
@@ -2366,7 +2365,7 @@ public final <R> R to(Function<? super Single<T>, R> convert) {
23662365
return convert.apply(this);
23672366
} catch (Throwable ex) {
23682367
Exceptions.throwIfFatal(ex);
2369-
throw Exceptions.propagate(ex);
2368+
throw ExceptionHelper.wrapOrThrow(ex);
23702369
}
23712370
}
23722371

src/main/java/io/reactivex/disposables/ActionDisposable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
*/
1313
package io.reactivex.disposables;
1414

15-
import io.reactivex.exceptions.Exceptions;
1615
import io.reactivex.functions.Action;
16+
import io.reactivex.internal.util.ExceptionHelper;
1717

1818
final class ActionDisposable extends ReferenceDisposable<Action> {
1919
/** */
@@ -28,8 +28,7 @@ protected void onDisposed(Action value) {
2828
try {
2929
value.run();
3030
} catch (Throwable ex) {
31-
Exceptions.throwIfFatal(ex);
32-
throw Exceptions.propagate(ex);
31+
throw ExceptionHelper.wrapOrThrow(ex);
3332
}
3433
}
3534
}

src/main/java/io/reactivex/disposables/CompositeDisposable.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import io.reactivex.exceptions.*;
1818
import io.reactivex.internal.disposables.DisposableContainer;
1919
import io.reactivex.internal.functions.ObjectHelper;
20-
import io.reactivex.internal.util.OpenHashSet;
20+
import io.reactivex.internal.util.*;
2121

2222
/**
2323
* A disposable container that can hold onto multiple other disposables and
@@ -54,6 +54,7 @@ public CompositeDisposable(Disposable... resources) {
5454
*/
5555
public CompositeDisposable(Iterable<? extends Disposable> resources) {
5656
ObjectHelper.requireNonNull(resources, "resources is null");
57+
this.resources = new OpenHashSet<Disposable>();
5758
for (Disposable d : resources) {
5859
ObjectHelper.requireNonNull(d, "Disposable item is null");
5960
this.resources.add(d);
@@ -193,7 +194,8 @@ public int size() {
193194
if (disposed) {
194195
return 0;
195196
}
196-
return resources.size();
197+
OpenHashSet<Disposable> set = resources;
198+
return set != null ? set.size() : 0;
197199
}
198200
}
199201

@@ -223,7 +225,7 @@ void dispose(OpenHashSet<Disposable> set) {
223225
}
224226
if (errors != null) {
225227
if (errors.size() == 1) {
226-
throw Exceptions.propagate(errors.get(0));
228+
throw ExceptionHelper.wrapOrThrow(errors.get(0));
227229
}
228230
throw new CompositeException(errors);
229231
}

src/main/java/io/reactivex/disposables/SerialDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,6 @@ public void dispose() {
8181

8282
@Override
8383
public boolean isDisposed() {
84-
return DisposableHelper.isDisposed(get());
84+
return DisposableHelper.isDisposed(resource.get());
8585
}
8686
}

src/main/java/io/reactivex/exceptions/CompositeException.java

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -205,9 +205,7 @@ private void printStackTrace(PrintStreamOrWriter s) {
205205
appendStackTrace(b, ex, "\t");
206206
i++;
207207
}
208-
synchronized (s.lock()) {
209-
s.println(b.toString());
210-
}
208+
s.println(b.toString());
211209
}
212210

213211
private void appendStackTrace(StringBuilder b, Throwable ex, String prefix) {
@@ -222,9 +220,6 @@ private void appendStackTrace(StringBuilder b, Throwable ex, String prefix) {
222220
}
223221

224222
abstract static class PrintStreamOrWriter {
225-
/** Returns the object to be locked when using this StreamOrWriter */
226-
abstract Object lock();
227-
228223
/** Prints the specified string as a line on this StreamOrWriter */
229224
abstract void println(Object o);
230225
}
@@ -239,11 +234,6 @@ static final class WrappedPrintStream extends PrintStreamOrWriter {
239234
this.printStream = printStream;
240235
}
241236

242-
@Override
243-
Object lock() {
244-
return printStream;
245-
}
246-
247237
@Override
248238
void println(Object o) {
249239
printStream.println(o);
@@ -257,11 +247,6 @@ static final class WrappedPrintWriter extends PrintStreamOrWriter {
257247
this.printWriter = printWriter;
258248
}
259249

260-
@Override
261-
Object lock() {
262-
return printWriter;
263-
}
264-
265250
@Override
266251
void println(Object o) {
267252
printWriter.println(o);
@@ -310,7 +295,7 @@ public int size() {
310295
* any suppressed exceptions.
311296
*/
312297
public boolean isEmpty() {
313-
return exceptions.isEmpty() && getCause() == null;
298+
return exceptions.isEmpty();
314299
}
315300

316301
/**
@@ -321,16 +306,15 @@ public boolean isEmpty() {
321306
*/
322307
private Throwable getRootCause(Throwable e) {
323308
Throwable root = e.getCause();
324-
if (root == null || root == e) {
309+
if (root == null /* || cause == root */) { // case might not be possible
325310
return e;
326-
} else {
327-
while(true) {
328-
Throwable cause = root.getCause();
329-
if (cause == null || cause == root) {
330-
return root;
331-
}
332-
root = root.getCause();
311+
}
312+
while(true) {
313+
Throwable cause = root.getCause();
314+
if (cause == null /* || cause == root */) { // case might not be possible
315+
return root;
333316
}
317+
root = root.getCause();
334318
}
335319
}
336320
}

src/main/java/io/reactivex/exceptions/Exceptions.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.reactivex.exceptions;
1515

16+
import io.reactivex.internal.util.ExceptionHelper;
17+
1618
/**
1719
* Utility class to help propagate checked exceptions and rethrow exceptions
1820
* designated as fatal.
@@ -39,13 +41,7 @@ public static RuntimeException propagate(Throwable t) {
3941
* Even though nothing will return and throw via that 'throw', it allows the code to look like it
4042
* so it's easy to read and understand that it will always result in a throw.
4143
*/
42-
if (t instanceof RuntimeException) {
43-
throw (RuntimeException) t;
44-
} else if (t instanceof Error) {
45-
throw (Error) t;
46-
} else {
47-
throw new RuntimeException(t); // NOPMD
48-
}
44+
throw ExceptionHelper.wrapOrThrow(t);
4945
}
5046

5147
/**

src/main/java/io/reactivex/exceptions/MissingBackpressureException.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,15 @@
1616
/**
1717
* Indicates that an operator attempted to emit a value but the downstream wasn't ready for it.
1818
*/
19-
public class MissingBackpressureException extends RuntimeException {
19+
public final class MissingBackpressureException extends RuntimeException {
2020
/** */
2121
private static final long serialVersionUID = 8517344746016032542L;
2222

2323
/**
2424
* Constructs a MissingBackpressureException without message or cause.
2525
*/
2626
public MissingBackpressureException() {
27-
super();
27+
// no message
2828
}
2929

3030
/**
@@ -35,12 +35,4 @@ public MissingBackpressureException(String message) {
3535
super(message);
3636
}
3737

38-
/**
39-
* Constructs a MissingBackpressureException with the given message and cause.
40-
* @param message the error message
41-
* @param cause the cause Throwable
42-
*/
43-
public MissingBackpressureException(String message, Throwable cause) {
44-
super(message, cause);
45-
}
4638
}

src/main/java/io/reactivex/internal/disposables/ListCompositeDisposable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.exceptions.*;
1919
import io.reactivex.internal.functions.ObjectHelper;
20+
import io.reactivex.internal.util.ExceptionHelper;
2021

2122
/**
2223
* A disposable container that can hold onto multiple other disposables.
@@ -177,7 +178,7 @@ void dispose(List<Disposable> set) {
177178
}
178179
if (errors != null) {
179180
if (errors.size() == 1) {
180-
throw Exceptions.propagate(errors.get(0));
181+
throw ExceptionHelper.wrapOrThrow(errors.get(0));
181182
}
182183
throw new CompositeException(errors);
183184
}

0 commit comments

Comments
 (0)