Skip to content

Commit 470eddb

Browse files
authored
2.x: count, elementAt, ingoreElements, last, single, reduce, reduceWith (#4576)
to return non-Flowable
1 parent 71ddf4b commit 470eddb

File tree

58 files changed

+1895
-491
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+1895
-491
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 46 additions & 133 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Maybe.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3159,7 +3159,7 @@ public final Maybe<T> retry() {
31593159
*/
31603160
@SchedulerSupport(SchedulerSupport.NONE)
31613161
public final Maybe<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
3162-
return toFlowable().retry(predicate).toMaybe();
3162+
return toFlowable().retry(predicate).singleElement();
31633163
}
31643164

31653165
/**
@@ -3199,7 +3199,7 @@ public final Maybe<T> retry(long count) {
31993199
*/
32003200
@SchedulerSupport(SchedulerSupport.NONE)
32013201
public final Maybe<T> retry(long times, Predicate<? super Throwable> predicate) {
3202-
return toFlowable().retry(times, predicate).toMaybe();
3202+
return toFlowable().retry(times, predicate).singleElement();
32033203
}
32043204

32053205
/**
@@ -3283,7 +3283,7 @@ public final Maybe<T> retryUntil(final BooleanSupplier stop) {
32833283
@SchedulerSupport(SchedulerSupport.NONE)
32843284
public final Maybe<T> retryWhen(
32853285
final Function<? super Flowable<? extends Throwable>, ? extends Publisher<?>> handler) {
3286-
return toFlowable().retryWhen(handler).toMaybe();
3286+
return toFlowable().retryWhen(handler).singleElement();
32873287
}
32883288

32893289
/**

src/main/java/io/reactivex/Single.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,7 @@ public static <T> Single<T> fromCallable(final Callable<? extends T> callable) {
453453
*/
454454
@SchedulerSupport(SchedulerSupport.NONE)
455455
public static <T> Single<T> fromFuture(Future<? extends T> future) {
456-
return Flowable.<T>fromFuture(future).toSingle();
456+
return toSingle(Flowable.<T>fromFuture(future));
457457
}
458458

459459
/**
@@ -485,7 +485,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future) {
485485
*/
486486
@SchedulerSupport(SchedulerSupport.NONE)
487487
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit) {
488-
return Flowable.<T>fromFuture(future, timeout, unit).toSingle();
488+
return toSingle(Flowable.<T>fromFuture(future, timeout, unit));
489489
}
490490

491491
/**
@@ -519,7 +519,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout,
519519
*/
520520
@SchedulerSupport(SchedulerSupport.CUSTOM)
521521
public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout, TimeUnit unit, Scheduler scheduler) {
522-
return Flowable.<T>fromFuture(future, timeout, unit, scheduler).toSingle();
522+
return toSingle(Flowable.<T>fromFuture(future, timeout, unit, scheduler));
523523
}
524524

525525
/**
@@ -548,7 +548,7 @@ public static <T> Single<T> fromFuture(Future<? extends T> future, long timeout,
548548
*/
549549
@SchedulerSupport(SchedulerSupport.CUSTOM)
550550
public static <T> Single<T> fromFuture(Future<? extends T> future, Scheduler scheduler) {
551-
return Flowable.<T>fromFuture(future, scheduler).toSingle();
551+
return toSingle(Flowable.<T>fromFuture(future, scheduler));
552552
}
553553

554554
/**
@@ -978,7 +978,7 @@ public static <T> Single<T> wrap(SingleSource<T> source) {
978978
@SchedulerSupport(SchedulerSupport.NONE)
979979
public static <T, R> Single<R> zip(final Iterable<? extends SingleSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper) {
980980
ObjectHelper.requireNonNull(sources, "sources is null");
981-
return Flowable.zipIterable(SingleInternalHelper.iterableToFlowable(sources), zipper, false, 1).toSingle();
981+
return toSingle(Flowable.zipIterable(SingleInternalHelper.iterableToFlowable(sources), zipper, false, 1));
982982
}
983983

984984
/**
@@ -1418,7 +1418,7 @@ public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R>
14181418
sourcePublishers[i] = RxJavaPlugins.onAssembly(new SingleToFlowable<T>(s));
14191419
i++;
14201420
}
1421-
return Flowable.zipArray(zipper, false, 1, sourcePublishers).toSingle();
1421+
return toSingle(Flowable.zipArray(zipper, false, 1, sourcePublishers));
14221422
}
14231423

14241424
/**
@@ -2252,7 +2252,7 @@ public final Flowable<T> repeatUntil(BooleanSupplier stop) {
22522252
*/
22532253
@SchedulerSupport(SchedulerSupport.NONE)
22542254
public final Single<T> retry() {
2255-
return toFlowable().retry().toSingle();
2255+
return toSingle(toFlowable().retry());
22562256
}
22572257

22582258
/**
@@ -2268,7 +2268,7 @@ public final Single<T> retry() {
22682268
*/
22692269
@SchedulerSupport(SchedulerSupport.NONE)
22702270
public final Single<T> retry(long times) {
2271-
return toFlowable().retry(times).toSingle();
2271+
return toSingle(toFlowable().retry(times));
22722272
}
22732273

22742274
/**
@@ -2285,7 +2285,7 @@ public final Single<T> retry(long times) {
22852285
*/
22862286
@SchedulerSupport(SchedulerSupport.NONE)
22872287
public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate) {
2288-
return toFlowable().retry(predicate).toSingle();
2288+
return toSingle(toFlowable().retry(predicate));
22892289
}
22902290

22912291
/**
@@ -2302,7 +2302,7 @@ public final Single<T> retry(BiPredicate<? super Integer, ? super Throwable> pre
23022302
*/
23032303
@SchedulerSupport(SchedulerSupport.NONE)
23042304
public final Single<T> retry(Predicate<? super Throwable> predicate) {
2305-
return toFlowable().retry(predicate).toSingle();
2305+
return toSingle(toFlowable().retry(predicate));
23062306
}
23072307

23082308
/**
@@ -2323,7 +2323,7 @@ public final Single<T> retry(Predicate<? super Throwable> predicate) {
23232323
*/
23242324
@SchedulerSupport(SchedulerSupport.NONE)
23252325
public final Single<T> retryWhen(Function<? super Flowable<? extends Throwable>, ? extends Publisher<Object>> handler) {
2326-
return toFlowable().retryWhen(handler).toSingle();
2326+
return toSingle(toFlowable().retryWhen(handler));
23272327
}
23282328

23292329
/**
@@ -2844,4 +2844,8 @@ public final TestObserver<T> test(boolean cancelled) {
28442844
subscribe(ts);
28452845
return ts;
28462846
}
2847+
2848+
private static <T> Single<T> toSingle(Flowable<T> source) {
2849+
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(source, null));
2850+
}
28472851
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.fuseable.FuseToFlowable;
21+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
24+
public final class FlowableCountSingle<T> extends Single<Long> implements FuseToFlowable<Long> {
25+
26+
final Publisher<T> source;
27+
28+
public FlowableCountSingle(Publisher<T> source) {
29+
this.source = source;
30+
}
31+
32+
@Override
33+
protected void subscribeActual(SingleObserver<? super Long> s) {
34+
source.subscribe(new CountSubscriber(s));
35+
}
36+
37+
@Override
38+
public Flowable<Long> fuseToFlowable() {
39+
return RxJavaPlugins.onAssembly(new FlowableCount<T>(source));
40+
}
41+
42+
static final class CountSubscriber implements Subscriber<Object>, Disposable {
43+
44+
final SingleObserver<? super Long> actual;
45+
46+
Subscription s;
47+
48+
long count;
49+
50+
CountSubscriber(SingleObserver<? super Long> actual) {
51+
this.actual = actual;
52+
}
53+
54+
@Override
55+
public void onSubscribe(Subscription s) {
56+
if (SubscriptionHelper.validate(this.s, s)) {
57+
this.s = s;
58+
actual.onSubscribe(this);
59+
s.request(Long.MAX_VALUE);
60+
}
61+
}
62+
63+
@Override
64+
public void onNext(Object t) {
65+
count++;
66+
}
67+
68+
@Override
69+
public void onError(Throwable t) {
70+
s = SubscriptionHelper.CANCELLED;
71+
actual.onError(t);
72+
}
73+
74+
@Override
75+
public void onComplete() {
76+
s = SubscriptionHelper.CANCELLED;
77+
actual.onSuccess(count);
78+
}
79+
80+
@Override
81+
public void dispose() {
82+
s.cancel();
83+
s = SubscriptionHelper.CANCELLED;
84+
}
85+
86+
@Override
87+
public boolean isDisposed() {
88+
return s == SubscriptionHelper.CANCELLED;
89+
}
90+
}
91+
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAt.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.reactivestreams.*;
1717

1818
import io.reactivex.internal.subscriptions.*;
19+
import io.reactivex.plugins.RxJavaPlugins;
1920

2021
public final class FlowableElementAt<T> extends AbstractFlowableWithUpstream<T, T> {
2122
final long index;
@@ -77,6 +78,7 @@ public void onNext(T t) {
7778
@Override
7879
public void onError(Throwable t) {
7980
if (done) {
81+
RxJavaPlugins.onError(t);
8082
return;
8183
}
8284
done = true;
@@ -89,7 +91,7 @@ public void onComplete() {
8991
done = true;
9092
T v = defaultValue;
9193
if (v == null) {
92-
actual.onError(new IndexOutOfBoundsException());
94+
actual.onComplete();
9395
} else {
9496
complete(v);
9597
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import org.reactivestreams.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.fuseable.FuseToFlowable;
21+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
22+
import io.reactivex.plugins.RxJavaPlugins;
23+
24+
public final class FlowableElementAtMaybe<T> extends Maybe<T> implements FuseToFlowable<T> {
25+
final Publisher<T> source;
26+
27+
final long index;
28+
29+
public FlowableElementAtMaybe(Publisher<T> source, long index) {
30+
this.source = source;
31+
this.index = index;
32+
}
33+
34+
@Override
35+
protected void subscribeActual(MaybeObserver<? super T> s) {
36+
source.subscribe(new ElementAtSubscriber<T>(s, index));
37+
}
38+
39+
@Override
40+
public Flowable<T> fuseToFlowable() {
41+
return RxJavaPlugins.onAssembly(new FlowableElementAt<T>(source, index, null));
42+
}
43+
44+
static final class ElementAtSubscriber<T> implements Subscriber<T>, Disposable {
45+
46+
final MaybeObserver<? super T> actual;
47+
48+
final long index;
49+
50+
Subscription s;
51+
52+
long count;
53+
54+
boolean done;
55+
56+
ElementAtSubscriber(MaybeObserver<? super T> actual, long index) {
57+
this.actual = actual;
58+
this.index = index;
59+
}
60+
61+
@Override
62+
public void onSubscribe(Subscription s) {
63+
if (SubscriptionHelper.validate(this.s, s)) {
64+
this.s = s;
65+
actual.onSubscribe(this);
66+
s.request(Long.MAX_VALUE);
67+
}
68+
}
69+
70+
@Override
71+
public void onNext(T t) {
72+
if (done) {
73+
return;
74+
}
75+
long c = count;
76+
if (c == index) {
77+
done = true;
78+
s.cancel();
79+
s = SubscriptionHelper.CANCELLED;
80+
actual.onSuccess(t);
81+
return;
82+
}
83+
count = c + 1;
84+
}
85+
86+
@Override
87+
public void onError(Throwable t) {
88+
if (done) {
89+
RxJavaPlugins.onError(t);
90+
return;
91+
}
92+
done = true;
93+
s = SubscriptionHelper.CANCELLED;
94+
actual.onError(t);
95+
}
96+
97+
@Override
98+
public void onComplete() {
99+
s = SubscriptionHelper.CANCELLED;
100+
if (index <= count && !done) {
101+
done = true;
102+
actual.onComplete();
103+
}
104+
}
105+
106+
@Override
107+
public void dispose() {
108+
s.cancel();
109+
s = SubscriptionHelper.CANCELLED;
110+
}
111+
112+
@Override
113+
public boolean isDisposed() {
114+
return s == SubscriptionHelper.CANCELLED;
115+
}
116+
117+
}
118+
}

0 commit comments

Comments
 (0)