Skip to content

Commit 754ed6b

Browse files
committed
2.x: collect, toList, toSortedList, toMap, toMultimap to return Single
1 parent 5d7f549 commit 754ed6b

File tree

64 files changed

+2368
-363
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+2368
-363
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 56 additions & 57 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 55 additions & 55 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/operators/flowable/FlowableAllSingle.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
public final class FlowableAllSingle<T> extends Single<Boolean> implements FuseToFlowable<Boolean> {
2626

2727
final Publisher<T> source;
28-
28+
2929
final Predicate<? super T> predicate;
3030

3131
public FlowableAllSingle(Publisher<T> source, Predicate<? super T> predicate) {
@@ -42,11 +42,11 @@ protected void subscribeActual(SingleObserver<? super Boolean> s) {
4242
public Flowable<Boolean> fuseToFlowable() {
4343
return RxJavaPlugins.onAssembly(new FlowableAll<T>(source, predicate));
4444
}
45-
45+
4646
static final class AllSubscriber<T> implements Subscriber<T>, Disposable {
47-
47+
4848
final SingleObserver<? super Boolean> actual;
49-
49+
5050
final Predicate<? super T> predicate;
5151

5252
Subscription s;
@@ -116,7 +116,7 @@ public void dispose() {
116116
s.cancel();
117117
s = SubscriptionHelper.CANCELLED;
118118
}
119-
119+
120120
@Override
121121
public boolean isDisposed() {
122122
return s == SubscriptionHelper.CANCELLED;

src/main/java/io/reactivex/internal/operators/flowable/FlowableAnySingle.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@
2424

2525
public final class FlowableAnySingle<T> extends Single<Boolean> implements FuseToFlowable<Boolean> {
2626
final Publisher<T> source;
27-
27+
2828
final Predicate<? super T> predicate;
29-
29+
3030
public FlowableAnySingle(Publisher<T> source, Predicate<? super T> predicate) {
3131
this.source = source;
3232
this.predicate = predicate;
@@ -36,7 +36,7 @@ public FlowableAnySingle(Publisher<T> source, Predicate<? super T> predicate) {
3636
protected void subscribeActual(SingleObserver<? super Boolean> s) {
3737
source.subscribe(new AnySubscriber<T>(s, predicate));
3838
}
39-
39+
4040
@Override
4141
public Flowable<Boolean> fuseToFlowable() {
4242
return RxJavaPlugins.onAssembly(new FlowableAny<T>(source, predicate));
@@ -45,7 +45,7 @@ public Flowable<Boolean> fuseToFlowable() {
4545
static final class AnySubscriber<T> implements Subscriber<T>, Disposable {
4646

4747
final SingleObserver<? super Boolean> actual;
48-
48+
4949
final Predicate<? super T> predicate;
5050

5151
Subscription s;
@@ -111,7 +111,7 @@ public void dispose() {
111111
s.cancel();
112112
s = SubscriptionHelper.CANCELLED;
113113
}
114-
114+
115115
@Override
116116
public boolean isDisposed() {
117117
return s == SubscriptionHelper.CANCELLED;
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.flowable;
14+
15+
import java.util.concurrent.Callable;
16+
17+
import org.reactivestreams.*;
18+
19+
import io.reactivex.*;
20+
import io.reactivex.disposables.Disposable;
21+
import io.reactivex.exceptions.Exceptions;
22+
import io.reactivex.functions.BiConsumer;
23+
import io.reactivex.internal.disposables.EmptyDisposable;
24+
import io.reactivex.internal.functions.ObjectHelper;
25+
import io.reactivex.internal.fuseable.FuseToFlowable;
26+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
27+
import io.reactivex.plugins.RxJavaPlugins;
28+
29+
public final class FlowableCollectSingle<T, U> extends Single<U> implements FuseToFlowable<U> {
30+
31+
final Publisher<T> source;
32+
33+
final Callable<? extends U> initialSupplier;
34+
final BiConsumer<? super U, ? super T> collector;
35+
36+
public FlowableCollectSingle(Publisher<T> source, Callable<? extends U> initialSupplier, BiConsumer<? super U, ? super T> collector) {
37+
this.source = source;
38+
this.initialSupplier = initialSupplier;
39+
this.collector = collector;
40+
}
41+
42+
@Override
43+
protected void subscribeActual(SingleObserver<? super U> s) {
44+
U u;
45+
try {
46+
u = ObjectHelper.requireNonNull(initialSupplier.call(), "The initialSupplier returned a null value");
47+
} catch (Throwable e) {
48+
EmptyDisposable.error(e, s);
49+
return;
50+
}
51+
52+
source.subscribe(new CollectSubscriber<T, U>(s, u, collector));
53+
}
54+
55+
@Override
56+
public Flowable<U> fuseToFlowable() {
57+
return RxJavaPlugins.onAssembly(new FlowableCollect<T, U>(source, initialSupplier, collector));
58+
}
59+
60+
static final class CollectSubscriber<T, U> implements Subscriber<T>, Disposable {
61+
62+
final SingleObserver<? super U> actual;
63+
64+
final BiConsumer<? super U, ? super T> collector;
65+
66+
final U u;
67+
68+
Subscription s;
69+
70+
boolean done;
71+
72+
CollectSubscriber(SingleObserver<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
73+
this.actual = actual;
74+
this.collector = collector;
75+
this.u = u;
76+
}
77+
78+
@Override
79+
public void onSubscribe(Subscription s) {
80+
if (SubscriptionHelper.validate(this.s, s)) {
81+
this.s = s;
82+
actual.onSubscribe(this);
83+
s.request(Long.MAX_VALUE);
84+
}
85+
}
86+
87+
@Override
88+
public void onNext(T t) {
89+
if (done) {
90+
return;
91+
}
92+
try {
93+
collector.accept(u, t);
94+
} catch (Throwable e) {
95+
Exceptions.throwIfFatal(e);
96+
s.cancel();
97+
onError(e);
98+
}
99+
}
100+
101+
@Override
102+
public void onError(Throwable t) {
103+
if (done) {
104+
RxJavaPlugins.onError(t);
105+
return;
106+
}
107+
done = true;
108+
s = SubscriptionHelper.CANCELLED;
109+
actual.onError(t);
110+
}
111+
112+
@Override
113+
public void onComplete() {
114+
if (done) {
115+
return;
116+
}
117+
done = true;
118+
s = SubscriptionHelper.CANCELLED;
119+
actual.onSuccess(u);
120+
}
121+
122+
@Override
123+
public void dispose() {
124+
s.cancel();
125+
s = SubscriptionHelper.CANCELLED;
126+
}
127+
128+
@Override
129+
public boolean isDisposed() {
130+
return s == SubscriptionHelper.CANCELLED;
131+
}
132+
}
133+
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,31 +30,31 @@
3030
public final class FlowableLastSingle<T> extends Single<T> {
3131

3232
final Publisher<T> source;
33-
33+
3434
final T defaultItem;
35-
35+
3636
public FlowableLastSingle(Publisher<T> source, T defaultItem) {
3737
this.source = source;
3838
this.defaultItem = defaultItem;
3939
}
4040

4141
// TODO fuse back to Flowable
42-
42+
4343
@Override
4444
protected void subscribeActual(SingleObserver<? super T> observer) {
4545
source.subscribe(new LastSubscriber<T>(observer, defaultItem));
4646
}
47-
47+
4848
static final class LastSubscriber<T> implements Subscriber<T>, Disposable {
49-
49+
5050
final SingleObserver<? super T> actual;
51-
51+
5252
final T defaultItem;
5353

5454
Subscription s;
55-
55+
5656
T item;
57-
57+
5858
public LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
5959
this.actual = actual;
6060
this.defaultItem = defaultItem;
@@ -75,9 +75,9 @@ public boolean isDisposed() {
7575
public void onSubscribe(Subscription s) {
7676
if (SubscriptionHelper.validate(this.s, s)) {
7777
this.s = s;
78-
78+
7979
actual.onSubscribe(this);
80-
80+
8181
s.request(Long.MAX_VALUE);
8282
}
8383
}
@@ -110,7 +110,5 @@ public void onComplete() {
110110
}
111111
}
112112
}
113-
114-
115113
}
116114
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.Collection;
17+
import java.util.concurrent.Callable;
18+
19+
import org.reactivestreams.*;
20+
21+
import io.reactivex.*;
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.exceptions.Exceptions;
24+
import io.reactivex.internal.disposables.EmptyDisposable;
25+
import io.reactivex.internal.fuseable.FuseToFlowable;
26+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
27+
import io.reactivex.internal.util.ArrayListSupplier;
28+
import io.reactivex.plugins.RxJavaPlugins;
29+
30+
public final class FlowableToListSingle<T, U extends Collection<? super T>> extends Single<U> implements FuseToFlowable<U> {
31+
32+
final Publisher<T> source;
33+
34+
final Callable<U> collectionSupplier;
35+
36+
@SuppressWarnings("unchecked")
37+
public FlowableToListSingle(Publisher<T> source) {
38+
this(source, (Callable<U>)ArrayListSupplier.asCallable());
39+
}
40+
41+
public FlowableToListSingle(Publisher<T> source, Callable<U> collectionSupplier) {
42+
this.source = source;
43+
this.collectionSupplier = collectionSupplier;
44+
}
45+
46+
@Override
47+
protected void subscribeActual(SingleObserver<? super U> s) {
48+
U coll;
49+
try {
50+
coll = collectionSupplier.call();
51+
} catch (Throwable e) {
52+
Exceptions.throwIfFatal(e);
53+
EmptyDisposable.error(e, s);
54+
return;
55+
}
56+
source.subscribe(new ToListSubscriber<T, U>(s, coll));
57+
}
58+
59+
@Override
60+
public Flowable<U> fuseToFlowable() {
61+
return RxJavaPlugins.onAssembly(new FlowableToList<T, U>(source, collectionSupplier));
62+
}
63+
64+
static final class ToListSubscriber<T, U extends Collection<? super T>>
65+
implements Subscriber<T>, Disposable {
66+
67+
final SingleObserver<? super U> actual;
68+
69+
Subscription s;
70+
71+
U value;
72+
73+
ToListSubscriber(SingleObserver<? super U> actual, U collection) {
74+
this.actual = actual;
75+
this.value = collection;
76+
}
77+
78+
@Override
79+
public void onSubscribe(Subscription s) {
80+
if (SubscriptionHelper.validate(this.s, s)) {
81+
this.s = s;
82+
actual.onSubscribe(this);
83+
s.request(Long.MAX_VALUE);
84+
}
85+
}
86+
87+
@Override
88+
public void onNext(T t) {
89+
value.add(t);
90+
}
91+
92+
@Override
93+
public void onError(Throwable t) {
94+
value = null;
95+
s = SubscriptionHelper.CANCELLED;
96+
actual.onError(t);
97+
}
98+
99+
@Override
100+
public void onComplete() {
101+
s = SubscriptionHelper.CANCELLED;
102+
actual.onSuccess(value);
103+
}
104+
105+
@Override
106+
public void dispose() {
107+
s.cancel();
108+
s = SubscriptionHelper.CANCELLED;
109+
}
110+
111+
@Override
112+
public boolean isDisposed() {
113+
return s == SubscriptionHelper.CANCELLED;
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)