Skip to content

Commit b733386

Browse files
committed
add Observable.switchMapSingle
1 parent 23b46f5 commit b733386

File tree

3 files changed

+152
-1
lines changed

3 files changed

+152
-1
lines changed

src/main/java/io/reactivex/Observable.java

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10865,6 +10865,67 @@ public final <R> Observable<R> switchMap(Function<? super T, ? extends Observabl
1086510865
return RxJavaPlugins.onAssembly(new ObservableSwitchMap<T, R>(this, mapper, bufferSize, false));
1086610866
}
1086710867

10868+
/**
10869+
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
10870+
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
10871+
* of these SingleSources.
10872+
* <p>
10873+
* The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete.
10874+
* If the upstream ObservableSource signals an onError, the inner SingleSource is unsubscribed and the error delivered in-sequence.
10875+
* <p>
10876+
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
10877+
* <dl>
10878+
* <dt><b>Scheduler:</b></dt>
10879+
* <dd>{@code switchMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
10880+
* </dl>
10881+
*
10882+
* @param <R> the element type of the inner SingleSources and the output
10883+
* @param mapper
10884+
* a function that, when applied to an item emitted by the source ObservableSource, returns a
10885+
* SingleSource
10886+
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
10887+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
10888+
* @since 2.0.8
10889+
*/
10890+
@Experimental
10891+
@CheckReturnValue
10892+
@SchedulerSupport(SchedulerSupport.NONE)
10893+
@NonNull
10894+
public final <R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
10895+
return ObservableInternalHelper.switchMapSingle(this, mapper);
10896+
}
10897+
10898+
/**
10899+
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
10900+
* ObservableSource that returns a SingleSource, and then emitting the item emitted by the most recently emitted
10901+
* of these SingleSources and delays any error until all SingleSources terminate.
10902+
* <p>
10903+
* The resulting ObservableSource completes if both the upstream ObservableSource and the last inner SingleSource, if any, complete.
10904+
* If the upstream ObservableSource signals an onError, the termination of the last inner SingleSource will emit that error as is
10905+
* or wrapped into a CompositeException along with the other possible errors the former inner SingleSources signalled.
10906+
* <p>
10907+
* <img width="640" height="350" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchMap.png" alt="">
10908+
* <dl>
10909+
* <dt><b>Scheduler:</b></dt>
10910+
* <dd>{@code switchMapSingleDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
10911+
* </dl>
10912+
*
10913+
* @param <R> the element type of the inner SingleSources and the output
10914+
* @param mapper
10915+
* a function that, when applied to an item emitted by the source ObservableSource, returns a
10916+
* SingleSource
10917+
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
10918+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
10919+
* @since 2.0.8
10920+
*/
10921+
@Experimental
10922+
@CheckReturnValue
10923+
@SchedulerSupport(SchedulerSupport.NONE)
10924+
@NonNull
10925+
public final <R> Observable<R> switchMapSingleDelayError(@NonNull Function<? super T, ? extends SingleSource<? extends R>> mapper) {
10926+
return ObservableInternalHelper.switchMapSingleDelayError(this, mapper);
10927+
}
10928+
1086810929
/**
1086910930
* Returns a new ObservableSource by applying a function that you supply to each item emitted by the source
1087010931
* ObservableSource that returns an ObservableSource, and then emitting the items emitted by the most recently emitted

src/main/java/io/reactivex/internal/operators/observable/ObservableInternalHelper.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
import io.reactivex.*;
1919
import io.reactivex.functions.*;
2020
import io.reactivex.internal.functions.Functions;
21+
import io.reactivex.internal.functions.ObjectHelper;
22+
import io.reactivex.internal.operators.single.SingleToObservable;
2123
import io.reactivex.observables.ConnectableObservable;
24+
import io.reactivex.plugins.RxJavaPlugins;
2225

2326
/**
2427
* Helper utility class to support Observable with inner classes.
@@ -315,4 +318,34 @@ public static <T, R> Function<List<ObservableSource<? extends T>>, ObservableSou
315318
return new ZipIterableFunction<T, R>(zipper);
316319
}
317320

321+
public static <T,R> Observable<R> switchMapSingle(Observable<T> source, final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
322+
return source.switchMap(convertSingleMapperToObservableMapper(mapper), 1);
323+
}
324+
325+
public static <T,R> Observable<R> switchMapSingleDelayError(Observable<T> source,
326+
Function<? super T, ? extends SingleSource<? extends R>> mapper) {
327+
return source.switchMapDelayError(convertSingleMapperToObservableMapper(mapper), 1);
328+
}
329+
330+
private static <T, R> Function<T, Observable<R>> convertSingleMapperToObservableMapper(
331+
final Function<? super T, ? extends SingleSource<? extends R>> mapper) {
332+
ObjectHelper.requireNonNull(mapper, "mapper is null");
333+
return new ObservableMapper<T,R>(mapper);
334+
}
335+
336+
static final class ObservableMapper<T,R> implements Function<T,Observable<R>> {
337+
338+
final Function<? super T, ? extends SingleSource<? extends R>> mapper;
339+
340+
ObservableMapper(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
341+
this.mapper = mapper;
342+
}
343+
344+
@Override
345+
public Observable<R> apply(T t) throws Exception {
346+
return RxJavaPlugins.onAssembly(new SingleToObservable<R>(mapper.apply(t)));
347+
}
348+
349+
}
350+
318351
}

src/test/java/io/reactivex/internal/operators/observable/ObservableSwitchTest.java

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivex.*;
2727
import io.reactivex.disposables.*;
2828
import io.reactivex.exceptions.*;
29+
import io.reactivex.functions.Consumer;
2930
import io.reactivex.functions.Function;
3031
import io.reactivex.internal.functions.Functions;
3132
import io.reactivex.internal.util.ExceptionHelper;
@@ -579,7 +580,6 @@ public ObservableSource<Integer> apply(Object v) throws Exception {
579580
}, 16)
580581
.test()
581582
.assertResult(1);
582-
583583
}
584584

585585
@Test
@@ -622,7 +622,64 @@ public void switchMapInnerCancelled() {
622622

623623
assertFalse(pp.hasObservers());
624624
}
625+
626+
@Test
627+
public void switchMapSingleJustSource() {
628+
Observable.just(0)
629+
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
630+
@Override
631+
public SingleSource<Integer> apply(Object v) throws Exception {
632+
return Single.just(1);
633+
}
634+
})
635+
.test()
636+
.assertResult(1);
637+
}
638+
639+
@Test
640+
public void switchMapSingleFunctionDoesntReturnSingle() {
641+
Observable.just(0)
642+
.switchMapSingle(new Function<Object, SingleSource<Integer>>() {
643+
@Override
644+
public SingleSource<Integer> apply(Object v) throws Exception {
645+
return new SingleSource<Integer>() {
646+
@Override
647+
public void subscribe(SingleObserver<? super Integer> s) {
648+
s.onSubscribe(Disposables.empty());
649+
s.onSuccess(1);
650+
}
651+
};
652+
}
653+
})
654+
.test()
655+
.assertResult(1);
656+
}
625657

658+
@Test
659+
public void switchMapSingleDelayErrorJustSource() {
660+
final AtomicBoolean completed = new AtomicBoolean();
661+
Observable.just(0, 1)
662+
.switchMapSingleDelayError(new Function<Integer, SingleSource<Integer>>() {
663+
@Override
664+
public SingleSource<Integer> apply(Integer v) throws Exception {
665+
if (v == 0) {
666+
return Single.error(new RuntimeException());
667+
} else {
668+
return Single.just(1).doOnSuccess(new Consumer<Integer>() {
669+
670+
@Override
671+
public void accept(Integer n) throws Exception {
672+
completed.set(true);
673+
}});
674+
}
675+
}
676+
})
677+
.test()
678+
.assertValue(1)
679+
.assertError(RuntimeException.class);
680+
assertTrue(completed.get());
681+
}
682+
626683
@Test
627684
public void scalarMap() {
628685
Observable.switchOnNext(Observable.just(Observable.just(1)))

0 commit comments

Comments
 (0)