Skip to content

2.x: coverage, bugfixes, 9/03-1 #4468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1331,15 +1331,15 @@ public final Completable startWith(CompletableSource other) {
}

/**
* Returns an NbpObservable which first delivers the events
* of the other NbpObservable then runs this CompletableConsumable.
* Returns an Observable which first delivers the events
* of the other Observable then runs this CompletableConsumable.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @param other the other NbpObservable to run first
* @return the new NbpObservable instance
* @param other the other Observable to run first
* @return the new Observable instance
* @throws NullPointerException if other is null
*/
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -1641,14 +1641,14 @@ public final <T> Maybe<T> toMaybe() {
}

/**
* Returns an NbpObservable which when subscribed to subscribes to this Completable and
* Returns an Observable which when subscribed to subscribes to this Completable and
* relays the terminal events to the subscriber.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code toObservable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
* @param <T> the value type
* @return the new NbpObservable created
* @return the new Observable created
*/
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Observable<T> toObservable() {
Expand Down
220 changes: 164 additions & 56 deletions src/main/java/io/reactivex/Flowable.java

Large diffs are not rendered by default.

42 changes: 33 additions & 9 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[]
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
Function<? super T[], ? extends R> combiner, int bufferSize) {
ObjectHelper.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
}
ObjectHelper.requireNonNull(combiner, "combiner is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");

Expand Down Expand Up @@ -954,7 +957,7 @@ public static <T> Observable<T> concat(
* Note: named this way because of overload conflict with concat(ObservableSource&lt;ObservableSource&gt;)
* @param sources the array of sources
* @param <T> the common base value type
* @return the new NbpObservable instance
* @return the new Observable instance
* @throws NullPointerException if sources is null
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
Expand Down Expand Up @@ -1832,9 +1835,16 @@ public static Observable<Long> intervalRange(long start, long count, long initia
*/
@SchedulerSupport(SchedulerSupport.CUSTOM)
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {

if (count < 0) {
throw new IllegalArgumentException("count >= 0 required but it was " + count);
}

if (count == 0L) {
return Observable.<Long>empty().delay(initialDelay, unit, scheduler);
}

long end = start + (count - 1);
if (end < 0) {
if (start > 0 && end < 0) {
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
}
ObjectHelper.requireNonNull(unit, "unit is null");
Expand Down Expand Up @@ -9818,11 +9828,25 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T

@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");

observer = RxJavaPlugins.onSubscribe(this, observer);

subscribeActual(observer);
ObjectHelper.requireNonNull(observer, "s is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);

ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);

NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}

/**
Expand Down Expand Up @@ -10151,7 +10175,7 @@ public final Observable<T> takeFirst(Predicate<? super T> predicate) {
@SchedulerSupport(SchedulerSupport.NONE)
public final Observable<T> takeLast(int count) {
if (count < 0) {
throw new IndexOutOfBoundsException("count >= required but it was " + count);
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
} else
if (count == 0) {
return ignoreElements();
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/internal/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ private Functions() {
}

@SuppressWarnings("unchecked")
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> biFunction) {
ObjectHelper.requireNonNull(biFunction, "biFunction is null");
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
ObjectHelper.requireNonNull(f, "f is null");
return new Function<Object[], R>() {
@Override
public R apply(Object[] a) throws Exception {
if (a.length != 2) {
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
}
return ((BiFunction<Object, Object, R>)biFunction).apply(a[0], a[1]);
return ((BiFunction<Object, Object, R>)f).apply(a[0], a[1]);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ static final class PublisherBufferExactSubscriber<T, C extends Collection<? supe
Subscription s;

boolean done;

int index;

public PublisherBufferExactSubscriber(Subscriber<? super C> actual, int size, Callable<C> bufferSupplier) {
this.actual = actual;
Expand Down Expand Up @@ -133,10 +135,14 @@ public void onNext(T t) {
}

b.add(t);

if (b.size() == size) {

int i = index + 1;
if (i == size) {
index = 0;
buffer = null;
actual.onNext(b);
} else {
index = i;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ public final class FlowableCombineLatest<T, R>

final Iterable<? extends Publisher<? extends T>> iterable;

final Function<Object[], ? extends R> combiner;
final Function<? super T[], ? extends R> combiner;

final int bufferSize;

final boolean delayErrors;

public FlowableCombineLatest(Publisher<? extends T>[] array,
Function<Object[], ? extends R> combiner,
Function<? super T[], ? extends R> combiner,
int bufferSize, boolean delayErrors) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
Expand All @@ -61,7 +61,7 @@ public FlowableCombineLatest(Publisher<? extends T>[] array,
}

public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
Function<Object[], ? extends R> combiner,
Function<? super T[], ? extends R> combiner,
int bufferSize, boolean delayErrors) {
if (bufferSize <= 0) {
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
Expand Down Expand Up @@ -150,7 +150,7 @@ public void subscribeActual(Subscriber<? super R> s) {
new FlowableMap<T, R>((Publisher<T>)a[0], new Function<T, R>() {
@Override
public R apply(T t) throws Exception {
return combiner.apply(new Object[] { t });
return combiner.apply((T[])new Object[] { t });
}
}).subscribe(s);
return;
Expand All @@ -173,7 +173,7 @@ static final class CombineLatestCoordinator<T, R>

final Subscriber<? super R> actual;

final Function<Object[], ? extends R> combiner;
final Function<? super T[], ? extends R> combiner;

final CombineLatestInnerSubscriber<T>[] subscribers;

Expand All @@ -198,7 +198,7 @@ static final class CombineLatestCoordinator<T, R>
final AtomicReference<Throwable> error;

public CombineLatestCoordinator(Subscriber<? super R> actual,
Function<Object[], ? extends R> combiner, int n,
Function<? super T[], ? extends R> combiner, int n,
int bufferSize, boolean delayErrors) {
this.actual = actual;
this.combiner = combiner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,87 +273,88 @@ public void drain() {

if (inner != null) {
SimpleQueue<R> q = inner.queue();

while (e != r) {
if (cancelled) {
cancelAll();
return;
}

if (em == ErrorMode.IMMEDIATE) {
Throwable ex = error.get();
if (ex != null) {
current = null;
inner.cancel();
if (q != null) {
while (e != r) {
if (cancelled) {
cancelAll();

a.onError(ex);
return;
}
}

boolean d = inner.isDone();

R v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
current = null;
inner.cancel();
cancelAll();
a.onError(ex);
return;
}

boolean empty = v == null;

if (d && empty) {
inner = null;
current = null;
s.request(1);
continue outer;
}

if (empty) {
break;
}

a.onNext(v);

e++;

inner.requestOne();
}

if (e == r) {
if (cancelled) {
cancelAll();
return;
}

if (em == ErrorMode.IMMEDIATE) {
Throwable ex = error.get();
if (ex != null) {

if (em == ErrorMode.IMMEDIATE) {
Throwable ex = error.get();
if (ex != null) {
current = null;
inner.cancel();
cancelAll();

a.onError(ex);
return;
}
}

boolean d = inner.isDone();

R v;

try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
current = null;
inner.cancel();
cancelAll();

a.onError(ex);
return;
}

boolean empty = v == null;

if (d && empty) {
inner = null;
current = null;
s.request(1);
continue outer;
}

if (empty) {
break;
}

a.onNext(v);

e++;

inner.requestOne();
}

boolean d = inner.isDone();

boolean empty = inner.queue().isEmpty();

if (d && empty) {
inner = null;
current = null;
s.request(1);
continue;

if (e == r) {
if (cancelled) {
cancelAll();
return;
}

if (em == ErrorMode.IMMEDIATE) {
Throwable ex = error.get();
if (ex != null) {
current = null;
inner.cancel();
cancelAll();

a.onError(ex);
return;
}
}

boolean d = inner.isDone();

boolean empty = q.isEmpty();

if (d && empty) {
inner = null;
current = null;
s.request(1);
continue;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ void drain() {
e++;
}

if (e == r) {
boolean d = done;
boolean empty = q.isEmpty();

if (checkTerminated(d, empty, a)) {
return;
}
}

if (e != 0L) {
if (r != Long.MAX_VALUE) {
requested.addAndGet(-e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
* @param unit the unit of measure of the age amount
* @param scheduler the target scheduler providing the current time
* @param bufferSize the maximum number of elements to hold
* @return the new NbpConnectableObservable instance
* @return the new ConnectableFlowable instance
*/
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {
Expand Down
Loading