Skip to content

Commit ccfa4bf

Browse files
authored
2.x: More Completable marbles, add C.fromMaybe (#6085)
1 parent c8bcb3c commit ccfa4bf

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,8 @@ public static Completable error(final Throwable error) {
388388
/**
389389
* Returns a Completable instance that runs the given Action for each subscriber and
390390
* emits either an unchecked exception or simply completes.
391+
* <p>
392+
* <img width="640" height="297" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromAction.png" alt="">
391393
* <dl>
392394
* <dt><b>Scheduler:</b></dt>
393395
* <dd>{@code fromAction} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -406,6 +408,8 @@ public static Completable fromAction(final Action run) {
406408
/**
407409
* Returns a Completable which when subscribed, executes the callable function, ignores its
408410
* normal result and emits onError or onComplete only.
411+
* <p>
412+
* <img width="640" height="286" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromCallable.png" alt="">
409413
* <dl>
410414
* <dt><b>Scheduler:</b></dt>
411415
* <dd>{@code fromCallable} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -423,6 +427,8 @@ public static Completable fromCallable(final Callable<?> callable) {
423427
/**
424428
* Returns a Completable instance that reacts to the termination of the given Future in a blocking fashion.
425429
* <p>
430+
* <img width="640" height="628" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromFuture.png" alt="">
431+
* <p>
426432
* Note that cancellation from any of the subscribers to this Completable will cancel the future.
427433
* <dl>
428434
* <dt><b>Scheduler:</b></dt>
@@ -438,9 +444,33 @@ public static Completable fromFuture(final Future<?> future) {
438444
return fromAction(Functions.futureAction(future));
439445
}
440446

447+
/**
448+
* Returns a Completable instance that when subscribed to, subscribes to the {@code Maybe} instance and
449+
* emits a completion event if the maybe emits {@code onSuccess}/{@code onComplete} or forwards any
450+
* {@code onError} events.
451+
* <p>
452+
* <img width="640" height="235" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromMaybe.png" alt="">
453+
* <dl>
454+
* <dt><b>Scheduler:</b></dt>
455+
* <dd>{@code fromMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
456+
* </dl>
457+
* @param <T> the value type of the {@link MaybeSource} element
458+
* @param maybe the Maybe instance to subscribe to, not null
459+
* @return the new Completable instance
460+
* @throws NullPointerException if single is null
461+
*/
462+
@CheckReturnValue
463+
@SchedulerSupport(SchedulerSupport.NONE)
464+
public static <T> Completable fromMaybe(final MaybeSource<T> maybe) {
465+
ObjectHelper.requireNonNull(maybe, "maybe is null");
466+
return RxJavaPlugins.onAssembly(new MaybeIgnoreElementCompletable<T>(maybe));
467+
}
468+
441469
/**
442470
* Returns a Completable instance that runs the given Runnable for each subscriber and
443471
* emits either its exception or simply completes.
472+
* <p>
473+
* <img width="640" height="297" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromRunnable.png" alt="">
444474
* <dl>
445475
* <dt><b>Scheduler:</b></dt>
446476
* <dd>{@code fromRunnable} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -459,6 +489,8 @@ public static Completable fromRunnable(final Runnable run) {
459489
/**
460490
* Returns a Completable instance that subscribes to the given Observable, ignores all values and
461491
* emits only the terminal event.
492+
* <p>
493+
* <img width="640" height="414" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromObservable.png" alt="">
462494
* <dl>
463495
* <dt><b>Scheduler:</b></dt>
464496
* <dd>{@code fromObservable} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -479,6 +511,8 @@ public static <T> Completable fromObservable(final ObservableSource<T> observabl
479511
* Returns a Completable instance that subscribes to the given publisher, ignores all values and
480512
* emits only the terminal event.
481513
* <p>
514+
* <img width="640" height="442" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromPublisher.png" alt="">
515+
* <p>
482516
* The {@link Publisher} must follow the
483517
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams">Reactive-Streams specification</a>.
484518
* Violating the specification may result in undefined behavior.
@@ -513,6 +547,8 @@ public static <T> Completable fromPublisher(final Publisher<T> publisher) {
513547
/**
514548
* Returns a Completable instance that when subscribed to, subscribes to the Single instance and
515549
* emits a completion event if the single emits onSuccess or forwards any onError events.
550+
* <p>
551+
* <img width="640" height="356" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.fromSingle.png" alt="">
516552
* <dl>
517553
* <dt><b>Scheduler:</b></dt>
518554
* <dd>{@code fromSingle} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -532,6 +568,8 @@ public static <T> Completable fromSingle(final SingleSource<T> single) {
532568
/**
533569
* Returns a Completable instance that subscribes to all sources at once and
534570
* completes only when all source Completables complete or one of them emits an error.
571+
* <p>
572+
* <img width="640" height="270" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeArray.png" alt="">
535573
* <dl>
536574
* <dt><b>Scheduler:</b></dt>
537575
* <dd>{@code mergeArray} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -569,6 +607,8 @@ public static Completable mergeArray(CompletableSource... sources) {
569607
/**
570608
* Returns a Completable instance that subscribes to all sources at once and
571609
* completes only when all source Completables complete or one of them emits an error.
610+
* <p>
611+
* <img width="640" height="311" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.png" alt="">
572612
* <dl>
573613
* <dt><b>Scheduler:</b></dt>
574614
* <dd>{@code merge} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -601,6 +641,8 @@ public static Completable merge(Iterable<? extends CompletableSource> sources) {
601641
/**
602642
* Returns a Completable instance that subscribes to all sources at once and
603643
* completes only when all source Completables complete or one of them emits an error.
644+
* <p>
645+
* <img width="640" height="336" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.p.png" alt="">
604646
* <dl>
605647
* <dt><b>Backpressure:</b></dt>
606648
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
@@ -635,6 +677,8 @@ public static Completable merge(Publisher<? extends CompletableSource> sources)
635677
/**
636678
* Returns a Completable instance that keeps subscriptions to a limited number of sources at once and
637679
* completes only when all source Completables complete or one of them emits an error.
680+
* <p>
681+
* <img width="640" height="269" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.merge.pn.png" alt="">
638682
* <dl>
639683
* <dt><b>Backpressure:</b></dt>
640684
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
@@ -699,6 +743,8 @@ private static Completable merge0(Publisher<? extends CompletableSource> sources
699743
* Returns a CompletableConsumable that subscribes to all Completables in the source array and delays
700744
* any error emitted by either the sources observable or any of the inner Completables until all of
701745
* them terminate in a way or another.
746+
* <p>
747+
* <img width="640" height="430" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeArrayDelayError.png" alt="">
702748
* <dl>
703749
* <dt><b>Scheduler:</b></dt>
704750
* <dd>{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -718,6 +764,8 @@ public static Completable mergeArrayDelayError(CompletableSource... sources) {
718764
* Returns a Completable that subscribes to all Completables in the source sequence and delays
719765
* any error emitted by either the sources observable or any of the inner Completables until all of
720766
* them terminate in a way or another.
767+
* <p>
768+
* <img width="640" height="475" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.png" alt="">
721769
* <dl>
722770
* <dt><b>Scheduler:</b></dt>
723771
* <dd>{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -738,6 +786,8 @@ public static Completable mergeDelayError(Iterable<? extends CompletableSource>
738786
* Returns a Completable that subscribes to all Completables in the source sequence and delays
739787
* any error emitted by either the sources observable or any of the inner Completables until all of
740788
* them terminate in a way or another.
789+
* <p>
790+
* <img width="640" height="466" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.p.png" alt="">
741791
* <dl>
742792
* <dt><b>Backpressure:</b></dt>
743793
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
@@ -761,6 +811,8 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>
761811
* the source sequence and delays any error emitted by either the sources
762812
* observable or any of the inner Completables until all of
763813
* them terminate in a way or another.
814+
* <p>
815+
* <img width="640" height="440" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.mergeDelayError.pn.png" alt="">
764816
* <dl>
765817
* <dt><b>Backpressure:</b></dt>
766818
* <dd>The returned {@code Completable} honors the backpressure of the downstream consumer
@@ -782,6 +834,8 @@ public static Completable mergeDelayError(Publisher<? extends CompletableSource>
782834

783835
/**
784836
* Returns a Completable that never calls onError or onComplete.
837+
* <p>
838+
* <img width="640" height="512" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.never.png" alt="">
785839
* <dl>
786840
* <dt><b>Scheduler:</b></dt>
787841
* <dd>{@code never} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -796,6 +850,8 @@ public static Completable never() {
796850

797851
/**
798852
* Returns a Completable instance that fires its onComplete event after the given delay elapsed.
853+
* <p>
854+
* <img width="640" height="413" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timer.png" alt="">
799855
* <dl>
800856
* <dt><b>Scheduler:</b></dt>
801857
* <dd>{@code timer} does operate by default on the {@code computation} {@link Scheduler}.</dd>
@@ -813,6 +869,8 @@ public static Completable timer(long delay, TimeUnit unit) {
813869
/**
814870
* Returns a Completable instance that fires its onComplete event after the given delay elapsed
815871
* by using the supplied scheduler.
872+
* <p>
873+
* <img width="640" height="413" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.timer.s.png" alt="">
816874
* <dl>
817875
* <dt><b>Scheduler:</b></dt>
818876
* <dd>{@code timer} operates on the {@link Scheduler} you specify.</dd>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.completable;
15+
16+
import io.reactivex.Completable;
17+
import io.reactivex.Maybe;
18+
import org.junit.Test;
19+
20+
public class CompletableFromMaybeTest {
21+
@Test(expected = NullPointerException.class)
22+
public void fromMaybeNull() {
23+
Completable.fromMaybe(null);
24+
}
25+
26+
@Test
27+
public void fromMaybe() {
28+
Completable.fromMaybe(Maybe.just(1))
29+
.test()
30+
.assertResult();
31+
}
32+
33+
@Test
34+
public void fromMaybeEmpty() {
35+
Completable.fromMaybe(Maybe.<Integer>empty())
36+
.test()
37+
.assertResult();
38+
}
39+
40+
@Test
41+
public void fromMaybeError() {
42+
Completable.fromMaybe(Maybe.error(new UnsupportedOperationException()))
43+
.test()
44+
.assertFailure(UnsupportedOperationException.class);
45+
}
46+
}

0 commit comments

Comments
 (0)