Skip to content

Commit 2fa773c

Browse files
authored
2.x: add missing null checks on values returned by user functions (#5379)
1 parent 815c5d2 commit 2fa773c

35 files changed

+315
-31
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCombineLatest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -474,7 +474,7 @@ public R poll() throws Exception {
474474
return null;
475475
}
476476
T[] a = (T[])queue.poll();
477-
R r = combiner.apply(a);
477+
R r = ObjectHelper.requireNonNull(combiner.apply(a), "The combiner returned a null value");
478478
((CombineLatestInnerSubscriber<T>)e).requestOne();
479479
return r;
480480
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import io.reactivex.*;
2121
import io.reactivex.flowables.ConnectableFlowable;
2222
import io.reactivex.functions.*;
23-
import io.reactivex.internal.functions.Functions;
23+
import io.reactivex.internal.functions.*;
2424

2525
/**
2626
* Helper utility class to support Flowable with inner classes.
@@ -77,7 +77,8 @@ static final class ItemDelayFunction<T, U> implements Function<T, Publisher<T>>
7777

7878
@Override
7979
public Publisher<T> apply(final T v) throws Exception {
80-
return new FlowableTakePublisher<U>(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
80+
Publisher<U> p = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null Publisher");
81+
return new FlowableTakePublisher<U>(p, 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
8182
}
8283
}
8384

@@ -164,7 +165,7 @@ static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Publ
164165
@Override
165166
public Publisher<R> apply(final T t) throws Exception {
166167
@SuppressWarnings("unchecked")
167-
Publisher<U> u = (Publisher<U>)mapper.apply(t);
168+
Publisher<U> u = (Publisher<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Publisher");
168169
return new FlowableMapPublisher<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t));
169170
}
170171
}
@@ -184,7 +185,7 @@ static final class FlatMapIntoIterable<T, U> implements Function<T, Publisher<U>
184185

185186
@Override
186187
public Publisher<U> apply(T t) throws Exception {
187-
return new FlowableFromIterable<U>(mapper.apply(t));
188+
return new FlowableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable"));
188189
}
189190
}
190191

@@ -317,7 +318,8 @@ static final class ReplayFunction<T, R> implements Function<Flowable<T>, Publish
317318

318319
@Override
319320
public Publisher<R> apply(Flowable<T> t) throws Exception {
320-
return Flowable.fromPublisher(selector.apply(t)).observeOn(scheduler);
321+
Publisher<R> p = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Publisher");
322+
return Flowable.fromPublisher(p).observeOn(scheduler);
321323
}
322324
}
323325
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableMapNotification.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import org.reactivestreams.Subscriber;
1919

2020
import io.reactivex.Flowable;
21-
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.exceptions.*;
2222
import io.reactivex.functions.Function;
2323
import io.reactivex.internal.functions.ObjectHelper;
2424
import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber;
@@ -87,7 +87,7 @@ public void onError(Throwable t) {
8787
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError publisher returned is null");
8888
} catch (Throwable e) {
8989
Exceptions.throwIfFatal(e);
90-
actual.onError(e);
90+
actual.onError(new CompositeException(t, e));
9191
return;
9292
}
9393

src/main/java/io/reactivex/internal/operators/flowable/FlowableUsing.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.*;
2222
import io.reactivex.exceptions.*;
2323
import io.reactivex.functions.*;
24+
import io.reactivex.internal.functions.ObjectHelper;
2425
import io.reactivex.internal.subscriptions.*;
2526
import io.reactivex.plugins.RxJavaPlugins;
2627

@@ -54,7 +55,7 @@ public void subscribeActual(Subscriber<? super T> s) {
5455

5556
Publisher<? extends T> source;
5657
try {
57-
source = sourceSupplier.apply(resource);
58+
source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null Publisher");
5859
} catch (Throwable e) {
5960
Exceptions.throwIfFatal(e);
6061
try {

src/main/java/io/reactivex/internal/operators/flowable/FlowableWithLatestFromMany.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void onNext(T t) {
168168
R v;
169169

170170
try {
171-
v = ObjectHelper.requireNonNull(combiner.apply(objects), "combiner returned a null value");
171+
v = ObjectHelper.requireNonNull(combiner.apply(objects), "The combiner returned a null value");
172172
} catch (Throwable ex) {
173173
Exceptions.throwIfFatal(ex);
174174
cancel();
@@ -297,7 +297,7 @@ public void dispose() {
297297
final class SingletonArrayFunc implements Function<T, R> {
298298
@Override
299299
public R apply(T t) throws Exception {
300-
return combiner.apply(new Object[] { t });
300+
return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value");
301301
}
302302
}
303303
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeZipArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,7 @@ public void onComplete() {
192192
final class SingletonArrayFunc implements Function<T, R> {
193193
@Override
194194
public R apply(T t) throws Exception {
195-
return zipper.apply(new Object[] { t });
195+
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
196196
}
197197
}
198198
}

src/main/java/io/reactivex/internal/operators/maybe/MaybeZipIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.functions.Function;
2121
import io.reactivex.internal.disposables.EmptyDisposable;
22+
import io.reactivex.internal.functions.ObjectHelper;
2223
import io.reactivex.internal.operators.maybe.MaybeZipArray.ZipCoordinator;
2324

2425
public final class MaybeZipIterable<T, R> extends Maybe<R> {
@@ -81,7 +82,7 @@ protected void subscribeActual(MaybeObserver<? super R> observer) {
8182
final class SingletonArrayFunc implements Function<T, R> {
8283
@Override
8384
public R apply(T t) throws Exception {
84-
return zipper.apply(new Object[] { t });
85+
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
8586
}
8687
}
8788
}

src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ static final class ItemDelayFunction<T, U> implements Function<T, ObservableSour
7777

7878
@Override
7979
public ObservableSource<T> apply(final T v) throws Exception {
80-
return new ObservableTake<U>(itemDelay.apply(v), 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
80+
ObservableSource<U> o = ObjectHelper.requireNonNull(itemDelay.apply(v), "The itemDelay returned a null ObservableSource");
81+
return new ObservableTake<U>(o, 1).map(Functions.justFunction(v)).defaultIfEmpty(v);
8182
}
8283
}
8384

@@ -165,7 +166,7 @@ static final class FlatMapWithCombinerOuter<T, R, U> implements Function<T, Obse
165166
@Override
166167
public ObservableSource<R> apply(final T t) throws Exception {
167168
@SuppressWarnings("unchecked")
168-
ObservableSource<U> u = (ObservableSource<U>)mapper.apply(t);
169+
ObservableSource<U> u = (ObservableSource<U>)ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
169170
return new ObservableMap<U, R>(u, new FlatMapWithCombinerInner<U, R, T>(combiner, t));
170171
}
171172
}
@@ -185,7 +186,7 @@ static final class FlatMapIntoIterable<T, U> implements Function<T, ObservableSo
185186

186187
@Override
187188
public ObservableSource<U> apply(T t) throws Exception {
188-
return new ObservableFromIterable<U>(mapper.apply(t));
189+
return new ObservableFromIterable<U>(ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null Iterable"));
189190
}
190191
}
191192

@@ -319,7 +320,7 @@ static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {
319320
@Override
320321
public Observable<R> apply(T t) throws Exception {
321322
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(
322-
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null value")));
323+
ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null SingleSource")));
323324
}
324325

325326
}
@@ -403,7 +404,8 @@ static final class ReplayFunction<T, R> implements Function<Observable<T>, Obser
403404

404405
@Override
405406
public ObservableSource<R> apply(Observable<T> t) throws Exception {
406-
return Observable.wrap(selector.apply(t)).observeOn(scheduler);
407+
ObservableSource<R> apply = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null ObservableSource");
408+
return Observable.wrap(apply).observeOn(scheduler);
407409
}
408410
}
409411
}

src/main/java/io/reactivex/internal/operators/observable/ObservableMapNotification.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16-
import io.reactivex.internal.functions.ObjectHelper;
1716
import java.util.concurrent.Callable;
1817

1918
import io.reactivex.*;
2019
import io.reactivex.disposables.Disposable;
21-
import io.reactivex.exceptions.Exceptions;
20+
import io.reactivex.exceptions.*;
2221
import io.reactivex.functions.Function;
2322
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.internal.functions.ObjectHelper;
2424

2525
public final class ObservableMapNotification<T, R> extends AbstractObservableWithUpstream<T, ObservableSource<? extends R>> {
2626

@@ -106,7 +106,7 @@ public void onError(Throwable t) {
106106
p = ObjectHelper.requireNonNull(onErrorMapper.apply(t), "The onError ObservableSource returned is null");
107107
} catch (Throwable e) {
108108
Exceptions.throwIfFatal(e);
109-
actual.onError(e);
109+
actual.onError(new CompositeException(t, e));
110110
return;
111111
}
112112

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.reactivex.exceptions.Exceptions;
2525
import io.reactivex.functions.*;
2626
import io.reactivex.internal.disposables.*;
27+
import io.reactivex.internal.functions.ObjectHelper;
2728
import io.reactivex.internal.fuseable.HasUpstreamObservableSource;
2829
import io.reactivex.internal.util.*;
2930
import io.reactivex.observables.ConnectableObservable;
@@ -1026,8 +1027,8 @@ protected void subscribeActual(Observer<? super R> child) {
10261027
ConnectableObservable<U> co;
10271028
ObservableSource<R> observable;
10281029
try {
1029-
co = connectableFactory.call();
1030-
observable = selector.apply(co);
1030+
co = ObjectHelper.requireNonNull(connectableFactory.call(), "The connectableFactory returned a null ConnectableObservable");
1031+
observable = ObjectHelper.requireNonNull(selector.apply(co), "The selector returned a null ObservableSource");
10311032
} catch (Throwable e) {
10321033
Exceptions.throwIfFatal(e);
10331034
EmptyDisposable.error(e, child);

src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.reactivex.exceptions.*;
2222
import io.reactivex.functions.*;
2323
import io.reactivex.internal.disposables.*;
24+
import io.reactivex.internal.functions.ObjectHelper;
2425
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
public final class ObservableUsing<T, D> extends Observable<T> {
@@ -53,7 +54,7 @@ public void subscribeActual(Observer<? super T> s) {
5354

5455
ObservableSource<? extends T> source;
5556
try {
56-
source = sourceSupplier.apply(resource);
57+
source = ObjectHelper.requireNonNull(sourceSupplier.apply(resource), "The sourceSupplier returned a null ObservableSource");
5758
} catch (Throwable e) {
5859
Exceptions.throwIfFatal(e);
5960
try {

src/main/java/io/reactivex/internal/operators/observable/ObservableWithLatestFromMany.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ public void dispose() {
286286
final class SingletonArrayFunc implements Function<T, R> {
287287
@Override
288288
public R apply(T t) throws Exception {
289-
return combiner.apply(new Object[] { t });
289+
return ObjectHelper.requireNonNull(combiner.apply(new Object[] { t }), "The combiner returned a null value");
290290
}
291291
}
292292
}

src/main/java/io/reactivex/internal/operators/single/SingleZipArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ public void onError(Throwable e) {
181181
final class SingletonArrayFunc implements Function<T, R> {
182182
@Override
183183
public R apply(T t) throws Exception {
184-
return zipper.apply(new Object[] { t });
184+
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
185185
}
186186
}
187187
}

src/main/java/io/reactivex/internal/operators/single/SingleZipIterable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.functions.Function;
2121
import io.reactivex.internal.disposables.EmptyDisposable;
22+
import io.reactivex.internal.functions.ObjectHelper;
2223
import io.reactivex.internal.operators.single.SingleZipArray.ZipCoordinator;
2324

2425
public final class SingleZipIterable<T, R> extends Single<R> {
@@ -81,7 +82,7 @@ protected void subscribeActual(SingleObserver<? super R> observer) {
8182
final class SingletonArrayFunc implements Function<T, R> {
8283
@Override
8384
public R apply(T t) throws Exception {
84-
return zipper.apply(new Object[] { t });
85+
return ObjectHelper.requireNonNull(zipper.apply(new Object[] { t }), "The zipper returned a null value");
8586
}
8687
}
8788
}

src/main/java/io/reactivex/observers/TestObserver.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public void onNext(T t) {
142142
} catch (Throwable ex) {
143143
// Exceptions.throwIfFatal(e); TODO add fatal exceptions?
144144
errors.add(ex);
145+
qs.dispose();
145146
}
146147
return;
147148
}

src/main/java/io/reactivex/subscribers/TestSubscriber.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ public void onNext(T t) {
203203
} catch (Throwable ex) {
204204
// Exceptions.throwIfFatal(e); TODO add fatal exceptions?
205205
errors.add(ex);
206+
qs.cancel();
206207
}
207208
return;
208209
}

src/test/java/io/reactivex/flowable/FlowableNullTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,6 +1341,7 @@ public Publisher<Integer> call() {
13411341
}
13421342

13431343
@Test(expected = NullPointerException.class)
1344+
@Ignore("No longer crashes with NPE but signals it; tested elsewhere.")
13441345
public void flatMapNotificationOnErrorReturnsNull() {
13451346
Flowable.error(new TestException()).flatMap(new Function<Object, Publisher<Integer>>() {
13461347
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.exceptions.*;
3131
import io.reactivex.functions.*;
3232
import io.reactivex.internal.functions.Functions;
33+
import io.reactivex.internal.fuseable.QueueFuseable;
3334
import io.reactivex.internal.operators.flowable.FlowableZipTest.ArgsToString;
3435
import io.reactivex.plugins.RxJavaPlugins;
3536
import io.reactivex.processors.PublishProcessor;
@@ -1551,4 +1552,21 @@ public Integer apply(Integer t1, Integer t2) throws Exception {
15511552
pp2.onNext(2);
15521553
ts.assertResult(3);
15531554
}
1555+
1556+
@Test
1557+
public void fusedNullCheck() {
1558+
TestSubscriber<Integer> ts = SubscriberFusion.newTest(QueueFuseable.ASYNC);
1559+
1560+
Flowable.combineLatest(Flowable.just(1), Flowable.just(2), new BiFunction<Integer, Integer, Integer>() {
1561+
@Override
1562+
public Integer apply(Integer t1, Integer t2) throws Exception {
1563+
return null;
1564+
}
1565+
})
1566+
.subscribe(ts);
1567+
1568+
ts
1569+
.assertOf(SubscriberFusion.<Integer>assertFusionMode(QueueFuseable.ASYNC))
1570+
.assertFailureAndMessage(NullPointerException.class, "The combiner returned a null value");
1571+
}
15541572
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableDelayTest.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1018,4 +1018,15 @@ public void onComplete() {
10181018
}
10191019
}
10201020

1021+
@Test
1022+
public void itemDelayReturnsNull() {
1023+
Flowable.just(1).delay(new Function<Integer, Publisher<Object>>() {
1024+
@Override
1025+
public Publisher<Object> apply(Integer t) throws Exception {
1026+
return null;
1027+
}
1028+
})
1029+
.test()
1030+
.assertFailureAndMessage(NullPointerException.class, "The itemDelay returned a null Publisher");
1031+
}
10211032
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapTest.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public void testFlatMapTransformsOnErrorFuncThrows() {
264264

265265
source.flatMap(just(onNext), funcThrow((Throwable) null, onError), just0(onComplete)).subscribe(o);
266266

267-
verify(o).onError(any(TestException.class));
267+
verify(o).onError(any(CompositeException.class));
268268
verify(o, never()).onNext(any());
269269
verify(o, never()).onComplete();
270270
}
@@ -997,4 +997,40 @@ public void run() {
997997
}
998998
}
999999
}
1000+
1001+
@Test
1002+
public void iterableMapperFunctionReturnsNull() {
1003+
Flowable.just(1)
1004+
.flatMapIterable(new Function<Integer, Iterable<Object>>() {
1005+
@Override
1006+
public Iterable<Object> apply(Integer v) throws Exception {
1007+
return null;
1008+
}
1009+
}, new BiFunction<Integer, Object, Object>() {
1010+
@Override
1011+
public Object apply(Integer v, Object w) throws Exception {
1012+
return v;
1013+
}
1014+
})
1015+
.test()
1016+
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Iterable");
1017+
}
1018+
1019+
@Test
1020+
public void combinerMapperFunctionReturnsNull() {
1021+
Flowable.just(1)
1022+
.flatMap(new Function<Integer, Publisher<Object>>() {
1023+
@Override
1024+
public Publisher<Object> apply(Integer v) throws Exception {
1025+
return null;
1026+
}
1027+
}, new BiFunction<Integer, Object, Object>() {
1028+
@Override
1029+
public Object apply(Integer v, Object w) throws Exception {
1030+
return v;
1031+
}
1032+
})
1033+
.test()
1034+
.assertFailureAndMessage(NullPointerException.class, "The mapper returned a null Publisher");
1035+
}
10001036
}

0 commit comments

Comments
 (0)