Skip to content

Commit 92d1494

Browse files
committed
2.x: coverage, bugfixes, 9/03-1
1 parent b15df98 commit 92d1494

File tree

89 files changed

+3352
-488
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+3352
-488
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1331,15 +1331,15 @@ public final Completable startWith(CompletableSource other) {
13311331
}
13321332

13331333
/**
1334-
* Returns an NbpObservable which first delivers the events
1335-
* of the other NbpObservable then runs this CompletableConsumable.
1334+
* Returns an Observable which first delivers the events
1335+
* of the other Observable then runs this CompletableConsumable.
13361336
* <dl>
13371337
* <dt><b>Scheduler:</b></dt>
13381338
* <dd>{@code startWith} does not operate by default on a particular {@link Scheduler}.</dd>
13391339
* </dl>
13401340
* @param <T> the value type
1341-
* @param other the other NbpObservable to run first
1342-
* @return the new NbpObservable instance
1341+
* @param other the other Observable to run first
1342+
* @return the new Observable instance
13431343
* @throws NullPointerException if other is null
13441344
*/
13451345
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1641,14 +1641,14 @@ public final <T> Maybe<T> toMaybe() {
16411641
}
16421642

16431643
/**
1644-
* Returns an NbpObservable which when subscribed to subscribes to this Completable and
1644+
* Returns an Observable which when subscribed to subscribes to this Completable and
16451645
* relays the terminal events to the subscriber.
16461646
* <dl>
16471647
* <dt><b>Scheduler:</b></dt>
16481648
* <dd>{@code toObservable} does not operate by default on a particular {@link Scheduler}.</dd>
16491649
* </dl>
16501650
* @param <T> the value type
1651-
* @return the new NbpObservable created
1651+
* @return the new Observable created
16521652
*/
16531653
@SchedulerSupport(SchedulerSupport.NONE)
16541654
public final <T> Observable<T> toObservable() {

src/main/java/io/reactivex/Flowable.java

Lines changed: 164 additions & 56 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,9 @@ public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[]
269269
public static <T, R> Observable<R> combineLatest(ObservableSource<? extends T>[] sources,
270270
Function<? super T[], ? extends R> combiner, int bufferSize) {
271271
ObjectHelper.requireNonNull(sources, "sources is null");
272+
if (sources.length == 0) {
273+
return empty();
274+
}
272275
ObjectHelper.requireNonNull(combiner, "combiner is null");
273276
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
274277

@@ -954,7 +957,7 @@ public static <T> Observable<T> concat(
954957
* Note: named this way because of overload conflict with concat(ObservableSource&lt;ObservableSource&gt;)
955958
* @param sources the array of sources
956959
* @param <T> the common base value type
957-
* @return the new NbpObservable instance
960+
* @return the new Observable instance
958961
* @throws NullPointerException if sources is null
959962
*/
960963
@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -1832,9 +1835,16 @@ public static Observable<Long> intervalRange(long start, long count, long initia
18321835
*/
18331836
@SchedulerSupport(SchedulerSupport.CUSTOM)
18341837
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
1835-
1838+
if (count < 0) {
1839+
throw new IllegalArgumentException("count >= 0 required but it was " + count);
1840+
}
1841+
1842+
if (count == 0L) {
1843+
return Observable.<Long>empty().delay(initialDelay, unit, scheduler);
1844+
}
1845+
18361846
long end = start + (count - 1);
1837-
if (end < 0) {
1847+
if (start > 0 && end < 0) {
18381848
throw new IllegalArgumentException("Overflow! start + count is bigger than Long.MAX_VALUE");
18391849
}
18401850
ObjectHelper.requireNonNull(unit, "unit is null");
@@ -9818,11 +9828,25 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
98189828

98199829
@Override
98209830
public final void subscribe(Observer<? super T> observer) {
9821-
ObjectHelper.requireNonNull(observer, "observer is null");
9822-
9823-
observer = RxJavaPlugins.onSubscribe(this, observer);
9824-
9825-
subscribeActual(observer);
9831+
ObjectHelper.requireNonNull(observer, "s is null");
9832+
try {
9833+
observer = RxJavaPlugins.onSubscribe(this, observer);
9834+
9835+
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
9836+
9837+
subscribeActual(observer);
9838+
} catch (NullPointerException e) { // NOPMD
9839+
throw e;
9840+
} catch (Throwable e) {
9841+
Exceptions.throwIfFatal(e);
9842+
// can't call onError because no way to know if a Disposable has been set or not
9843+
// can't call onSubscribe because the call might have set a Subscription already
9844+
RxJavaPlugins.onError(e);
9845+
9846+
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
9847+
npe.initCause(e);
9848+
throw npe;
9849+
}
98269850
}
98279851

98289852
/**
@@ -10151,7 +10175,7 @@ public final Observable<T> takeFirst(Predicate<? super T> predicate) {
1015110175
@SchedulerSupport(SchedulerSupport.NONE)
1015210176
public final Observable<T> takeLast(int count) {
1015310177
if (count < 0) {
10154-
throw new IndexOutOfBoundsException("count >= required but it was " + count);
10178+
throw new IndexOutOfBoundsException("count >= 0 required but it was " + count);
1015510179
} else
1015610180
if (count == 0) {
1015710181
return ignoreElements();

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,15 @@ private Functions() {
3131
}
3232

3333
@SuppressWarnings("unchecked")
34-
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> biFunction) {
35-
ObjectHelper.requireNonNull(biFunction, "biFunction is null");
34+
public static <T1, T2, R> Function<Object[], R> toFunction(final BiFunction<? super T1, ? super T2, ? extends R> f) {
35+
ObjectHelper.requireNonNull(f, "f is null");
3636
return new Function<Object[], R>() {
3737
@Override
3838
public R apply(Object[] a) throws Exception {
3939
if (a.length != 2) {
4040
throw new IllegalArgumentException("Array of size 2 expected but got " + a.length);
4141
}
42-
return ((BiFunction<Object, Object, R>)biFunction).apply(a[0], a[1]);
42+
return ((BiFunction<Object, Object, R>)f).apply(a[0], a[1]);
4343
}
4444
};
4545
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableBuffer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ static final class PublisherBufferExactSubscriber<T, C extends Collection<? supe
7777
Subscription s;
7878

7979
boolean done;
80+
81+
int index;
8082

8183
public PublisherBufferExactSubscriber(Subscriber<? super C> actual, int size, Callable<C> bufferSupplier) {
8284
this.actual = actual;
@@ -133,10 +135,14 @@ public void onNext(T t) {
133135
}
134136

135137
b.add(t);
136-
137-
if (b.size() == size) {
138+
139+
int i = index + 1;
140+
if (i == size) {
141+
index = 0;
138142
buffer = null;
139143
actual.onNext(b);
144+
} else {
145+
index = i;
140146
}
141147
}
142148

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,14 @@ public final class FlowableCombineLatest<T, R>
4040

4141
final Iterable<? extends Publisher<? extends T>> iterable;
4242

43-
final Function<Object[], ? extends R> combiner;
43+
final Function<? super T[], ? extends R> combiner;
4444

4545
final int bufferSize;
4646

4747
final boolean delayErrors;
4848

4949
public FlowableCombineLatest(Publisher<? extends T>[] array,
50-
Function<Object[], ? extends R> combiner,
50+
Function<? super T[], ? extends R> combiner,
5151
int bufferSize, boolean delayErrors) {
5252
if (bufferSize <= 0) {
5353
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
@@ -61,7 +61,7 @@ public FlowableCombineLatest(Publisher<? extends T>[] array,
6161
}
6262

6363
public FlowableCombineLatest(Iterable<? extends Publisher<? extends T>> iterable,
64-
Function<Object[], ? extends R> combiner,
64+
Function<? super T[], ? extends R> combiner,
6565
int bufferSize, boolean delayErrors) {
6666
if (bufferSize <= 0) {
6767
throw new IllegalArgumentException("BUFFER_SIZE > 0 required but it was " + bufferSize);
@@ -150,7 +150,7 @@ public void subscribeActual(Subscriber<? super R> s) {
150150
new FlowableMap<T, R>((Publisher<T>)a[0], new Function<T, R>() {
151151
@Override
152152
public R apply(T t) throws Exception {
153-
return combiner.apply(new Object[] { t });
153+
return combiner.apply((T[])new Object[] { t });
154154
}
155155
}).subscribe(s);
156156
return;
@@ -173,7 +173,7 @@ static final class CombineLatestCoordinator<T, R>
173173

174174
final Subscriber<? super R> actual;
175175

176-
final Function<Object[], ? extends R> combiner;
176+
final Function<? super T[], ? extends R> combiner;
177177

178178
final CombineLatestInnerSubscriber<T>[] subscribers;
179179

@@ -198,7 +198,7 @@ static final class CombineLatestCoordinator<T, R>
198198
final AtomicReference<Throwable> error;
199199

200200
public CombineLatestCoordinator(Subscriber<? super R> actual,
201-
Function<Object[], ? extends R> combiner, int n,
201+
Function<? super T[], ? extends R> combiner, int n,
202202
int bufferSize, boolean delayErrors) {
203203
this.actual = actual;
204204
this.combiner = combiner;

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,15 @@ void drain() {
179179
e++;
180180
}
181181

182+
if (e == r) {
183+
boolean d = done;
184+
boolean empty = q.isEmpty();
185+
186+
if (checkTerminated(d, empty, a)) {
187+
return;
188+
}
189+
}
190+
182191
if (e != 0L) {
183192
if (r != Long.MAX_VALUE) {
184193
requested.addAndGet(-e);

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public static <T> ConnectableFlowable<T> create(Flowable<T> source,
180180
* @param unit the unit of measure of the age amount
181181
* @param scheduler the target scheduler providing the current time
182182
* @param bufferSize the maximum number of elements to hold
183-
* @return the new NbpConnectableObservable instance
183+
* @return the new ConnectableFlowable instance
184184
*/
185185
public static <T> ConnectableFlowable<T> create(Flowable<T> source,
186186
final long maxAge, final TimeUnit unit, final Scheduler scheduler, final int bufferSize) {

0 commit comments

Comments
 (0)