Skip to content

Commit 9966209

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Fix Generics T[] in Zip & CombineLatest (#4525)
* 2.x: Fix Generics T[] in Zip & CombineLatest * Add Single zipIterableObject test * Add combineLatestObject test to Flowable + Object * Add Javadoc explanation * Add it to Single too
1 parent 44c5705 commit 9966209

File tree

14 files changed

+275
-49
lines changed

14 files changed

+275
-49
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 62 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ public static int bufferSize() {
140140
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
141141
* the source Publishers each time an item is received from any of the source Publishers, where this
142142
* aggregation is defined by a specified function.
143+
* <p>
144+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
145+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
146+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
147+
*
143148
* <dl>
144149
* <dt><b>Backpressure:</b></dt>
145150
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -163,14 +168,19 @@ public static int bufferSize() {
163168
*/
164169
@SchedulerSupport(SchedulerSupport.NONE)
165170
@BackpressureSupport(BackpressureKind.FULL)
166-
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super T[], ? extends R> combiner) {
171+
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner) {
167172
return combineLatest(sources, combiner, bufferSize());
168173
}
169174

170175
/**
171176
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
172177
* the source Publishers each time an item is received from any of the source Publishers, where this
173178
* aggregation is defined by a specified function.
179+
* <p>
180+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
181+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
182+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
183+
*
174184
* <dl>
175185
* <dt><b>Backpressure:</b></dt>
176186
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -194,14 +204,19 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
194204
*/
195205
@SchedulerSupport(SchedulerSupport.NONE)
196206
@BackpressureSupport(BackpressureKind.FULL)
197-
public static <T, R> Flowable<R> combineLatest(Function<? super T[], ? extends R> combiner, Publisher<? extends T>... sources) {
207+
public static <T, R> Flowable<R> combineLatest(Function<? super Object[], ? extends R> combiner, Publisher<? extends T>... sources) {
198208
return combineLatest(sources, combiner, bufferSize());
199209
}
200210

201211
/**
202212
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
203213
* the source Publishers each time an item is received from any of the source Publishers, where this
204214
* aggregation is defined by a specified function.
215+
* <p>
216+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
217+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
218+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
219+
*
205220
* <dl>
206221
* <dt><b>Backpressure:</b></dt>
207222
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -227,7 +242,7 @@ public static <T, R> Flowable<R> combineLatest(Function<? super T[], ? extends R
227242
*/
228243
@SchedulerSupport(SchedulerSupport.NONE)
229244
@BackpressureSupport(BackpressureKind.FULL)
230-
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super T[], ? extends R> combiner, int bufferSize) {
245+
public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources, Function<? super Object[], ? extends R> combiner, int bufferSize) {
231246
ObjectHelper.requireNonNull(sources, "sources is null");
232247
if (sources.length == 0) {
233248
return empty();
@@ -241,6 +256,11 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
241256
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
242257
* the source Publishers each time an item is received from any of the source Publishers, where this
243258
* aggregation is defined by a specified function.
259+
* <p>
260+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
261+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
262+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
263+
*
244264
* <dl>
245265
* <dt><b>Backpressure:</b></dt>
246266
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -265,14 +285,19 @@ public static <T, R> Flowable<R> combineLatest(Publisher<? extends T>[] sources,
265285
@SchedulerSupport(SchedulerSupport.NONE)
266286
@BackpressureSupport(BackpressureKind.FULL)
267287
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
268-
Function<? super T[], ? extends R> combiner) {
288+
Function<? super Object[], ? extends R> combiner) {
269289
return combineLatest(sources, combiner, bufferSize());
270290
}
271291

272292
/**
273293
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
274294
* the source Publishers each time an item is received from any of the source Publishers, where this
275295
* aggregation is defined by a specified function.
296+
* <p>
297+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
298+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
299+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
300+
*
276301
* <dl>
277302
* <dt><b>Backpressure:</b></dt>
278303
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -299,7 +324,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
299324
@SchedulerSupport(SchedulerSupport.NONE)
300325
@BackpressureSupport(BackpressureKind.FULL)
301326
public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? extends T>> sources,
302-
Function<? super T[], ? extends R> combiner, int bufferSize) {
327+
Function<? super Object[], ? extends R> combiner, int bufferSize) {
303328
ObjectHelper.requireNonNull(sources, "sources is null");
304329
ObjectHelper.requireNonNull(combiner, "combiner is null");
305330
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
@@ -310,6 +335,11 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
310335
* Combines a collection of source Publishers by emitting an item that aggregates the latest values of each of
311336
* the source Publishers each time an item is received from any of the source Publishers, where this
312337
* aggregation is defined by a specified function.
338+
* <p>
339+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
340+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
341+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
342+
*
313343
* <dl>
314344
* <dt><b>Backpressure:</b></dt>
315345
* <dd>The returned {@code Publisher} honors backpressure from downstream. The source {@code Publisher}s
@@ -334,7 +364,7 @@ public static <T, R> Flowable<R> combineLatest(Iterable<? extends Publisher<? ex
334364
@SchedulerSupport(SchedulerSupport.NONE)
335365
@BackpressureSupport(BackpressureKind.FULL)
336366
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources,
337-
Function<? super T[], ? extends R> combiner) {
367+
Function<? super Object[], ? extends R> combiner) {
338368
return combineLatestDelayError(sources, combiner, bufferSize());
339369
}
340370

@@ -343,6 +373,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
343373
* the source Publishers each time an item is received from any of the source Publishers, where this
344374
* aggregation is defined by a specified function and delays any error from the sources until
345375
* all source Publishers terminate.
376+
* <p>
377+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
378+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
379+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
346380
*
347381
* <dl>
348382
* <dt><b>Backpressure:</b></dt>
@@ -367,7 +401,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
367401
*/
368402
@SchedulerSupport(SchedulerSupport.NONE)
369403
@BackpressureSupport(BackpressureKind.FULL)
370-
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
404+
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
371405
Publisher<? extends T>... sources) {
372406
return combineLatestDelayError(sources, combiner, bufferSize());
373407
}
@@ -377,6 +411,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
377411
* the source ObservableSources each time an item is received from any of the source Publisher, where this
378412
* aggregation is defined by a specified function and delays any error from the sources until
379413
* all source Publishers terminate.
414+
* <p>
415+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
416+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
417+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
380418
*
381419
* <dl>
382420
* <dt><b>Scheduler:</b></dt>
@@ -398,7 +436,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
398436
* @see <a href="http://reactivex.io/documentation/operators/combinelatest.html">ReactiveX operators documentation: CombineLatest</a>
399437
*/
400438
@SchedulerSupport(SchedulerSupport.NONE)
401-
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ? extends R> combiner,
439+
public static <T, R> Flowable<R> combineLatestDelayError(Function<? super Object[], ? extends R> combiner,
402440
int bufferSize, Publisher<? extends T>... sources) {
403441
return combineLatestDelayError(sources, combiner, bufferSize);
404442
}
@@ -408,6 +446,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
408446
* the source Publishers each time an item is received from any of the source Publishers, where this
409447
* aggregation is defined by a specified function and delays any error from the sources until
410448
* all source Publishers terminate.
449+
* <p>
450+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
451+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
452+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
411453
*
412454
* <dl>
413455
* <dt><b>Backpressure:</b></dt>
@@ -435,7 +477,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Function<? super T[], ?
435477
@SchedulerSupport(SchedulerSupport.NONE)
436478
@BackpressureSupport(BackpressureKind.FULL)
437479
public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[] sources,
438-
Function<? super T[], ? extends R> combiner, int bufferSize) {
480+
Function<? super Object[], ? extends R> combiner, int bufferSize) {
439481
ObjectHelper.requireNonNull(sources, "sources is null");
440482
ObjectHelper.requireNonNull(combiner, "combiner is null");
441483
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
@@ -450,6 +492,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
450492
* the source Publishers each time an item is received from any of the source Publishers, where this
451493
* aggregation is defined by a specified function and delays any error from the sources until
452494
* all source Publishers terminate.
495+
* <p>
496+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
497+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
498+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
453499
*
454500
* <dl>
455501
* <dt><b>Backpressure:</b></dt>
@@ -475,7 +521,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Publisher<? extends T>[
475521
@SchedulerSupport(SchedulerSupport.NONE)
476522
@BackpressureSupport(BackpressureKind.FULL)
477523
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources,
478-
Function<? super T[], ? extends R> combiner) {
524+
Function<? super Object[], ? extends R> combiner) {
479525
return combineLatestDelayError(sources, combiner, bufferSize());
480526
}
481527

@@ -484,6 +530,10 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
484530
* the source Publishers each time an item is received from any of the source Publishers, where this
485531
* aggregation is defined by a specified function and delays any error from the sources until
486532
* all source Publishers terminate.
533+
* <p>
534+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
535+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
536+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
487537
*
488538
* <dl>
489539
* <dt><b>Backpressure:</b></dt>
@@ -511,7 +561,7 @@ public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publ
511561
@SchedulerSupport(SchedulerSupport.NONE)
512562
@BackpressureSupport(BackpressureKind.FULL)
513563
public static <T, R> Flowable<R> combineLatestDelayError(Iterable<? extends Publisher<? extends T>> sources,
514-
Function<? super T[], ? extends R> combiner, int bufferSize) {
564+
Function<? super Object[], ? extends R> combiner, int bufferSize) {
515565
ObjectHelper.requireNonNull(sources, "sources is null");
516566
ObjectHelper.requireNonNull(combiner, "combiner is null");
517567
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
@@ -14955,4 +15005,4 @@ public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // No
1495515005
return ts;
1495615006
}
1495715007

14958-
}
15008+
}

src/main/java/io/reactivex/Maybe.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1343,6 +1343,11 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
13431343
* Returns a Maybe that emits the results of a specified combiner function applied to combinations of
13441344
* items emitted, in sequence, by an Iterable of other MaybeSources.
13451345
* <p>
1346+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
1347+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
1348+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
1349+
*
1350+
* <p>
13461351
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
13471352
* <p>This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This
13481353
* also means it is possible some sources may not get subscribed to at all.
@@ -1362,7 +1367,7 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
13621367
* @see <a href="http://reactivex.io/documentation/operators/zip.html">ReactiveX operators documentation: Zip</a>
13631368
*/
13641369
@SchedulerSupport(SchedulerSupport.NONE)
1365-
public static <T, R> Maybe<R> zip(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super T[], ? extends R> zipper) {
1370+
public static <T, R> Maybe<R> zip(Iterable<? extends MaybeSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
13661371
ObjectHelper.requireNonNull(zipper, "zipper is null");
13671372
ObjectHelper.requireNonNull(sources, "sources is null");
13681373
return RxJavaPlugins.onAssembly(new MaybeZipIterable<T, R>(sources, zipper));
@@ -1774,6 +1779,11 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Maybe<R> zip(
17741779
* Returns a Maybe that emits the results of a specified combiner function applied to combinations of
17751780
* items emitted, in sequence, by an array of other MaybeSources.
17761781
* <p>
1782+
* Note on method signature: since Java doesn't allow creating a generic array with {@code new T[]}, the
1783+
* implementation of this operator has to create an {@code Object[]} instead. Unfortunately, a
1784+
* {@code Function<Integer[], R>} passed to the method would trigger a {@code ClassCastException}.
1785+
*
1786+
* <p>
17771787
* <img width="640" height="380" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/zip.png" alt="">
17781788
* <p>This operator terminates eagerly if any of the source MaybeSources signal an onError or onComplete. This
17791789
* also means it is possible some sources may not get subscribed to at all.
@@ -1796,6 +1806,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Maybe<R> zip(
17961806
@SchedulerSupport(SchedulerSupport.NONE)
17971807
public static <T, R> Maybe<R> zipArray(Function<? super Object[], ? extends R> zipper,
17981808
MaybeSource<? extends T>... sources) {
1809+
ObjectHelper.requireNonNull(sources, "sources is null");
17991810
if (sources.length == 0) {
18001811
return empty();
18011812
}

0 commit comments

Comments
 (0)