Skip to content

2.x Improve coverage, fix bugs #4430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 26, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import java.util.concurrent.*;

import org.reactivestreams.*;
import org.reactivestreams.Publisher;

import io.reactivex.annotations.SchedulerSupport;
import io.reactivex.disposables.Disposable;
Expand All @@ -24,8 +24,9 @@
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
import io.reactivex.internal.subscribers.completable.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
Expand Down Expand Up @@ -197,7 +198,6 @@ public static Completable concat(Publisher<? extends CompletableSource> sources,
* @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable}
* @return the new Completable instance
* @see FlowableOnSubscribe
* @see FlowableEmitter.BackpressureMode
* @see Cancellable
*/
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -1612,7 +1612,7 @@ public final <U> U to(Function<? super Completable, U> converter) {
return converter.apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4848,7 +4848,7 @@ public final void blockingForEach(Consumer<? super T> onNext) {
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
((Disposable)it).dispose();
throw Exceptions.propagate(e);
throw ExceptionHelper.wrapOrThrow(e);
}
}
}
Expand Down Expand Up @@ -13181,7 +13181,7 @@ public final <R> R to(Function<? super Flowable<T>, R> converter) {
return converter.apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -4346,7 +4346,7 @@ public final void blockingForEach(Consumer<? super T> onNext) {
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
((Disposable)it).dispose();
throw Exceptions.propagate(e);
throw ExceptionHelper.wrapOrThrow(e);
}
}
}
Expand Down Expand Up @@ -11154,7 +11154,7 @@ public final <R> R to(Function<? super Observable<T>, R> converter) {
return converter.apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

Expand Down
13 changes: 12 additions & 1 deletion src/main/java/io/reactivex/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.plugins.RxJavaPlugins;

/**
Expand All @@ -37,6 +38,16 @@ public abstract class Scheduler {
Long.getLong("rx2.scheduler.drift-tolerance", 15));
}

/**
* Returns the clock drift tolerance in nanoseconds.
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes
* @return the tolerance in nanoseconds
* @since 2.0
*/
public static long clockDriftTolerance() {
return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
}


/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
Expand Down Expand Up @@ -294,7 +305,7 @@ public void run() {
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throw Exceptions.propagate(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import io.reactivex.internal.operators.flowable.*;
import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.subscribers.single.*;
import io.reactivex.internal.util.ErrorMode;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.TestSubscriber;
Expand Down Expand Up @@ -249,7 +249,6 @@ public static <T> Flowable<T> concat(
* @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable}
* @return the new Single instance
* @see FlowableOnSubscribe
* @see FlowableEmitter.BackpressureMode
* @see Cancellable
*/
public static <T> Single<T> create(SingleOnSubscribe<T> source) {
Expand Down Expand Up @@ -2366,7 +2365,7 @@ public final <R> R to(Function<? super Single<T>, R> convert) {
return convert.apply(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}

Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/reactivex/disposables/ActionDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
*/
package io.reactivex.disposables;

import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Action;
import io.reactivex.internal.util.ExceptionHelper;

final class ActionDisposable extends ReferenceDisposable<Action> {
/** */
Expand All @@ -28,8 +28,7 @@ protected void onDisposed(Action value) {
try {
value.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw Exceptions.propagate(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.reactivex.exceptions.*;
import io.reactivex.internal.disposables.DisposableContainer;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.OpenHashSet;
import io.reactivex.internal.util.*;

/**
* A disposable container that can hold onto multiple other disposables and
Expand Down Expand Up @@ -54,6 +54,7 @@ public CompositeDisposable(Disposable... resources) {
*/
public CompositeDisposable(Iterable<? extends Disposable> resources) {
ObjectHelper.requireNonNull(resources, "resources is null");
this.resources = new OpenHashSet<Disposable>();
for (Disposable d : resources) {
ObjectHelper.requireNonNull(d, "Disposable item is null");
this.resources.add(d);
Expand Down Expand Up @@ -193,7 +194,8 @@ public int size() {
if (disposed) {
return 0;
}
return resources.size();
OpenHashSet<Disposable> set = resources;
return set != null ? set.size() : 0;
}
}

Expand Down Expand Up @@ -223,7 +225,7 @@ void dispose(OpenHashSet<Disposable> set) {
}
if (errors != null) {
if (errors.size() == 1) {
throw Exceptions.propagate(errors.get(0));
throw ExceptionHelper.wrapOrThrow(errors.get(0));
}
throw new CompositeException(errors);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ public void dispose() {

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
return DisposableHelper.isDisposed(resource.get());
}
}
34 changes: 9 additions & 25 deletions src/main/java/io/reactivex/exceptions/CompositeException.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,7 @@ private void printStackTrace(PrintStreamOrWriter s) {
appendStackTrace(b, ex, "\t");
i++;
}
synchronized (s.lock()) {
s.println(b.toString());
}
s.println(b.toString());
}

private void appendStackTrace(StringBuilder b, Throwable ex, String prefix) {
Expand All @@ -222,9 +220,6 @@ private void appendStackTrace(StringBuilder b, Throwable ex, String prefix) {
}

abstract static class PrintStreamOrWriter {
/** Returns the object to be locked when using this StreamOrWriter */
abstract Object lock();

/** Prints the specified string as a line on this StreamOrWriter */
abstract void println(Object o);
}
Expand All @@ -239,11 +234,6 @@ static final class WrappedPrintStream extends PrintStreamOrWriter {
this.printStream = printStream;
}

@Override
Object lock() {
return printStream;
}

@Override
void println(Object o) {
printStream.println(o);
Expand All @@ -257,11 +247,6 @@ static final class WrappedPrintWriter extends PrintStreamOrWriter {
this.printWriter = printWriter;
}

@Override
Object lock() {
return printWriter;
}

@Override
void println(Object o) {
printWriter.println(o);
Expand Down Expand Up @@ -310,7 +295,7 @@ public int size() {
* any suppressed exceptions.
*/
public boolean isEmpty() {
return exceptions.isEmpty() && getCause() == null;
return exceptions.isEmpty();
}

/**
Expand All @@ -321,16 +306,15 @@ public boolean isEmpty() {
*/
private Throwable getRootCause(Throwable e) {
Throwable root = e.getCause();
if (root == null || root == e) {
if (root == null /* || cause == root */) { // case might not be possible
return e;
} else {
while(true) {
Throwable cause = root.getCause();
if (cause == null || cause == root) {
return root;
}
root = root.getCause();
}
while(true) {
Throwable cause = root.getCause();
if (cause == null /* || cause == root */) { // case might not be possible
return root;
}
root = root.getCause();
}
}
}
10 changes: 3 additions & 7 deletions src/main/java/io/reactivex/exceptions/Exceptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

package io.reactivex.exceptions;

import io.reactivex.internal.util.ExceptionHelper;

/**
* Utility class to help propagate checked exceptions and rethrow exceptions
* designated as fatal.
Expand All @@ -39,13 +41,7 @@ public static RuntimeException propagate(Throwable t) {
* Even though nothing will return and throw via that 'throw', it allows the code to look like it
* so it's easy to read and understand that it will always result in a throw.
*/
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t); // NOPMD
}
throw ExceptionHelper.wrapOrThrow(t);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
/**
* Indicates that an operator attempted to emit a value but the downstream wasn't ready for it.
*/
public class MissingBackpressureException extends RuntimeException {
public final class MissingBackpressureException extends RuntimeException {
/** */
private static final long serialVersionUID = 8517344746016032542L;

/**
* Constructs a MissingBackpressureException without message or cause.
*/
public MissingBackpressureException() {
super();
// no message
}

/**
Expand All @@ -35,12 +35,4 @@ public MissingBackpressureException(String message) {
super(message);
}

/**
* Constructs a MissingBackpressureException with the given message and cause.
* @param message the error message
* @param cause the cause Throwable
*/
public MissingBackpressureException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.*;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.util.ExceptionHelper;

/**
* A disposable container that can hold onto multiple other disposables.
Expand Down Expand Up @@ -177,7 +178,7 @@ void dispose(List<Disposable> set) {
}
if (errors != null) {
if (errors.size() == 1) {
throw Exceptions.propagate(errors.get(0));
throw ExceptionHelper.wrapOrThrow(errors.get(0));
}
throw new CompositeException(errors);
}
Expand Down
Loading