Skip to content

Commit 6291f59

Browse files
authored
2.x: factor out inner classes from the base reactive types (#4360)
1 parent 176346a commit 6291f59

File tree

10 files changed

+1493
-874
lines changed

10 files changed

+1493
-874
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -323,13 +323,7 @@ public static Completable fromCallable(final Callable<?> callable) {
323323
@SchedulerSupport(SchedulerSupport.NONE)
324324
public static Completable fromFuture(final Future<?> future) {
325325
Objects.requireNonNull(future, "future is null");
326-
return fromCallable(new Callable<Object>() {
327-
@Override
328-
public Object call() throws Exception {
329-
future.get();
330-
return null;
331-
}
332-
});
326+
return fromAction(Functions.futureAction(future));
333327
}
334328

335329
/**

src/main/java/io/reactivex/Flowable.java

Lines changed: 84 additions & 421 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 63 additions & 377 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

Lines changed: 8 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,14 @@
1313

1414
package io.reactivex;
1515

16-
import java.util.*;
1716
import java.util.concurrent.*;
1817

1918
import org.reactivestreams.*;
2019

2120
import io.reactivex.disposables.Disposable;
2221
import io.reactivex.exceptions.Exceptions;
2322
import io.reactivex.functions.*;
24-
import io.reactivex.internal.functions.Functions;
25-
import io.reactivex.internal.functions.Objects;
23+
import io.reactivex.internal.functions.*;
2624
import io.reactivex.internal.operators.completable.CompletableFromSingle;
2725
import io.reactivex.internal.operators.flowable.*;
2826
import io.reactivex.internal.operators.flowable.FlowableConcatMap.ErrorMode;
@@ -87,12 +85,7 @@ public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends
8785
@SuppressWarnings("unchecked")
8886
public static <T> Single<T> amb(final SingleSource<? extends T>... sources) {
8987
if (sources.length == 0) {
90-
return error(new Callable<Throwable>() {
91-
@Override
92-
public Throwable call() {
93-
return new NoSuchElementException();
94-
}
95-
});
88+
return error(SingleInternalHelper.<T>emptyThrower());
9689
}
9790
if (sources.length == 1) {
9891
return wrap((SingleSource<T>)sources[0]);
@@ -130,13 +123,7 @@ public static <T> Flowable<T> concat(Iterable<? extends SingleSource<? extends T
130123
*/
131124
@SuppressWarnings({ "unchecked", "rawtypes" })
132125
public static <T> Flowable<T> concat(Publisher<? extends SingleSource<? extends T>> sources) {
133-
return new FlowableConcatMap(sources,
134-
new Function<SingleSource<? extends T>, Publisher<? extends T>>() {
135-
@Override
136-
public Publisher<? extends T> apply(SingleSource<? extends T> v){
137-
return new SingleToFlowable<T>(v);
138-
}
139-
}, 2, ErrorMode.IMMEDIATE);
126+
return new FlowableConcatMap(sources, SingleInternalHelper.toFlowable(), 2, ErrorMode.IMMEDIATE);
140127
}
141128

142129
/**
@@ -520,12 +507,7 @@ public static <T> Single<T> error(final Callable<? extends Throwable> errorSuppl
520507
*/
521508
public static <T> Single<T> error(final Throwable exception) {
522509
Objects.requireNonNull(exception, "error is null");
523-
return error(new Callable<Throwable>() {
524-
@Override
525-
public Throwable call() {
526-
return exception;
527-
}
528-
});
510+
return error(Functions.justCallable(exception));
529511
}
530512

531513
/**
@@ -740,13 +722,7 @@ public static <T> Flowable<T> merge(Iterable<? extends SingleSource<? extends T>
740722
*/
741723
@SuppressWarnings({ "unchecked", "rawtypes" })
742724
public static <T> Flowable<T> merge(Publisher<? extends SingleSource<? extends T>> sources) {
743-
return new FlowableFlatMap(sources,
744-
new Function<SingleSource<? extends T>, Publisher<? extends T>>() {
745-
@Override
746-
public Publisher<? extends T> apply(SingleSource<? extends T> v){
747-
return new SingleToFlowable<T>(v);
748-
}
749-
}, false, Integer.MAX_VALUE, Flowable.bufferSize());
725+
return new FlowableFlatMap(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize());
750726
}
751727

752728
/**
@@ -1289,31 +1265,7 @@ static <T> Single<T> wrap(SingleSource<T> source) {
12891265
*/
12901266
public static <T, R> Single<R> zip(final Iterable<? extends SingleSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
12911267
Objects.requireNonNull(sources, "sources is null");
1292-
1293-
Iterable<? extends Flowable<T>> it = new Iterable<Flowable<T>>() {
1294-
@Override
1295-
public Iterator<Flowable<T>> iterator() {
1296-
final Iterator<? extends SingleSource<? extends T>> sit = sources.iterator();
1297-
return new Iterator<Flowable<T>>() {
1298-
1299-
@Override
1300-
public boolean hasNext() {
1301-
return sit.hasNext();
1302-
}
1303-
1304-
@Override
1305-
public Flowable<T> next() {
1306-
return new SingleToFlowable<T>(sit.next());
1307-
}
1308-
1309-
@Override
1310-
public void remove() {
1311-
throw new UnsupportedOperationException();
1312-
}
1313-
};
1314-
}
1315-
};
1316-
return Flowable.zipIterable(it, zipper, false, 1).toSingle();
1268+
return Flowable.zipIterable(SingleInternalHelper.iterableToFlowable(sources), zipper, false, 1).toSingle();
13171269
}
13181270

13191271
/**
@@ -1826,12 +1778,7 @@ public final Single<T> cache() {
18261778
*/
18271779
public final <U> Single<U> cast(final Class<? extends U> clazz) {
18281780
Objects.requireNonNull(clazz, "clazz is null");
1829-
return map(new Function<T, U>() {
1830-
@Override
1831-
public U apply(T v) {
1832-
return clazz.cast(v);
1833-
}
1834-
});
1781+
return map(Functions.castFunction(clazz));
18351782
}
18361783

18371784
/**
@@ -2345,12 +2292,7 @@ public final Single<T> onErrorReturnValue(final T value) {
23452292
*/
23462293
public final Single<T> onErrorResumeNext(final Single<? extends T> resumeSingleInCaseOfError) {
23472294
Objects.requireNonNull(resumeSingleInCaseOfError, "resumeSingleInCaseOfError is null");
2348-
return onErrorResumeNext(new Function<Throwable, Single<? extends T>>() {
2349-
@Override
2350-
public Single<? extends T> apply(Throwable t) throws Exception {
2351-
return resumeSingleInCaseOfError;
2352-
}
2353-
});
2295+
return onErrorResumeNext(Functions.justFunction(resumeSingleInCaseOfError));
23542296
}
23552297

23562298
/**

0 commit comments

Comments
 (0)