Skip to content

Commit aba5f4a

Browse files
committed
2.x collect - handle post-terminal events properly
1 parent 5b8845d commit aba5f4a

File tree

4 files changed

+327
-54
lines changed

4 files changed

+327
-54
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.functions.BiConsumer;
2121
import io.reactivex.internal.subscriptions.*;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class FlowableCollect<T, U> extends AbstractFlowableWithUpstream<T, U> {
2425

@@ -58,6 +59,8 @@ static final class CollectSubscriber<T, U> extends DeferredScalarSubscription<U>
5859

5960
Subscription s;
6061

62+
boolean done;
63+
6164
public CollectSubscriber(Subscriber<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
6265
super(actual);
6366
this.collector = collector;
@@ -75,22 +78,34 @@ public void onSubscribe(Subscription s) {
7578

7679
@Override
7780
public void onNext(T t) {
81+
if (done) {
82+
return;
83+
}
7884
try {
7985
collector.accept(u, t);
8086
} catch (Throwable e) {
8187
Exceptions.throwIfFatal(e);
8288
s.cancel();
83-
actual.onError(e);
89+
onError(e);
8490
}
8591
}
8692

8793
@Override
8894
public void onError(Throwable t) {
95+
if (done) {
96+
RxJavaPlugins.onError(t);
97+
return;
98+
}
99+
done = true;
89100
actual.onError(t);
90101
}
91102

92103
@Override
93104
public void onComplete() {
105+
if (done) {
106+
return;
107+
}
108+
done = true;
94109
complete(u);
95110
}
96111

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.flowable;
14+
15+
import java.util.Arrays;
16+
import java.util.List;
17+
import java.util.Queue;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
24+
import io.reactivex.Flowable;
25+
import io.reactivex.Optional;
26+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
27+
import io.reactivex.internal.util.BackpressureHelper;
28+
29+
/**
30+
* Creates {@link Flowable} of a number of items followed by either an error or
31+
* completion. Cancellation has no effect on preventing emissions until the
32+
* currently outstanding requests have been met.
33+
*/
34+
public final class Burst<T> extends Flowable<T> {
35+
36+
private final List<T> items;
37+
private final Optional<Throwable> error;
38+
39+
private Burst(Optional<Throwable> error, List<T> items) {
40+
if (items.isEmpty()) {
41+
throw new IllegalArgumentException("items cannot be empty");
42+
}
43+
for (T item : items) {
44+
if (item == null) {
45+
throw new IllegalArgumentException("items cannot include null");
46+
}
47+
}
48+
this.error = error;
49+
this.items = items;
50+
}
51+
52+
@Override
53+
protected void subscribeActual(final Subscriber<? super T> subscriber) {
54+
subscriber.onSubscribe(new Subscription() {
55+
56+
final Queue<T> q = new ConcurrentLinkedQueue<T>(items);
57+
final AtomicLong requested = new AtomicLong();
58+
volatile boolean cancelled = false;
59+
60+
@Override
61+
public void request(long n) {
62+
if (cancelled) {
63+
// required by reactive-streams-jvm 3.6
64+
return;
65+
}
66+
if (SubscriptionHelper.validate(n)) {
67+
// just for testing, don't care about perf
68+
// so no attempt made to reduce volatile reads
69+
if (BackpressureHelper.add(requested, n) == 0) {
70+
if (q.isEmpty())
71+
return;
72+
while (!q.isEmpty() && requested.get() > 0) {
73+
T item = q.poll();
74+
requested.decrementAndGet();
75+
subscriber.onNext(item);
76+
}
77+
if (q.isEmpty()) {
78+
if (error.isPresent()) {
79+
subscriber.onError(error.get());
80+
} else {
81+
subscriber.onComplete();
82+
}
83+
}
84+
}
85+
}
86+
}
87+
88+
@Override
89+
public void cancel() {
90+
cancelled = true;
91+
}
92+
});
93+
94+
}
95+
96+
@SuppressWarnings("unchecked")
97+
public static <T> Builder<T> item(T item) {
98+
return items(item);
99+
}
100+
101+
public static <T> Builder<T> items(T... items) {
102+
return new Builder<T>(Arrays.asList(items));
103+
}
104+
105+
public static final class Builder<T> {
106+
107+
private final List<T> items;
108+
private Optional<Throwable> error = Optional.empty();
109+
110+
private Builder(List<T> items) {
111+
this.items = items;
112+
}
113+
114+
public Flowable<T> error(Throwable e) {
115+
this.error = Optional.of(e);
116+
return create();
117+
}
118+
119+
public Flowable<T> create() {
120+
return new Burst<T>(error, items);
121+
}
122+
123+
}
124+
125+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package io.reactivex.flowable;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.Callable;
10+
import java.util.concurrent.CopyOnWriteArrayList;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
13+
import org.junit.Test;
14+
15+
import io.reactivex.Flowable;
16+
import io.reactivex.functions.BiConsumer;
17+
import io.reactivex.functions.Consumer;
18+
import io.reactivex.plugins.RxJavaPlugins;
19+
20+
public class FlowableCollectTest {
21+
22+
@Test
23+
public void testCollectToList() {
24+
Flowable<List<Integer>> o = Flowable.just(1, 2, 3)
25+
.collect(new Callable<List<Integer>>() {
26+
@Override
27+
public List<Integer> call() {
28+
return new ArrayList<Integer>();
29+
}
30+
}, new BiConsumer<List<Integer>, Integer>() {
31+
@Override
32+
public void accept(List<Integer> list, Integer v) {
33+
list.add(v);
34+
}
35+
});
36+
37+
List<Integer> list = o.toBlocking().last();
38+
39+
assertEquals(3, list.size());
40+
assertEquals(1, list.get(0).intValue());
41+
assertEquals(2, list.get(1).intValue());
42+
assertEquals(3, list.get(2).intValue());
43+
44+
// test multiple subscribe
45+
List<Integer> list2 = o.toBlocking().last();
46+
47+
assertEquals(3, list2.size());
48+
assertEquals(1, list2.get(0).intValue());
49+
assertEquals(2, list2.get(1).intValue());
50+
assertEquals(3, list2.get(2).intValue());
51+
}
52+
53+
@Test
54+
public void testCollectToString() {
55+
String value = Flowable.just(1, 2, 3)
56+
.collect(
57+
new Callable<StringBuilder>() {
58+
@Override
59+
public StringBuilder call() {
60+
return new StringBuilder();
61+
}
62+
},
63+
new BiConsumer<StringBuilder, Integer>() {
64+
@Override
65+
public void accept(StringBuilder sb, Integer v) {
66+
if (sb.length() > 0) {
67+
sb.append("-");
68+
}
69+
sb.append(v);
70+
}
71+
}).toBlocking().last().toString();
72+
73+
assertEquals("1-2-3", value);
74+
}
75+
76+
77+
@Test
78+
public void testFactoryFailureResultsInErrorEmission() {
79+
final RuntimeException e = new RuntimeException();
80+
Flowable.just(1).collect(new Callable<List<Integer>>() {
81+
82+
@Override
83+
public List<Integer> call() throws Exception {
84+
throw e;
85+
}
86+
}, new BiConsumer<List<Integer>, Integer>() {
87+
88+
@Override
89+
public void accept(List<Integer> list, Integer t) {
90+
list.add(t);
91+
}
92+
})
93+
.test()
94+
.assertNoValues()
95+
.assertError(e)
96+
.assertNotComplete();
97+
}
98+
99+
@Test
100+
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
101+
try {
102+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
103+
RxJavaPlugins.setErrorHandler(addToList(list));
104+
final RuntimeException e1 = new RuntimeException();
105+
final RuntimeException e2 = new RuntimeException();
106+
107+
Burst.items(1).error(e2) //
108+
.collect(callableListCreator(), biConsumerThrows(e1)) //
109+
.test() //
110+
.assertError(e1) //
111+
.assertNotComplete();
112+
assertEquals(Arrays.asList(e2), list);
113+
} finally {
114+
RxJavaPlugins.reset();
115+
}
116+
}
117+
118+
@Test
119+
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
120+
final RuntimeException e = new RuntimeException();
121+
Burst.item(1).create() //
122+
.collect(callableListCreator(), biConsumerThrows(e)) //
123+
.test() //
124+
.assertError(e) //
125+
.assertNotComplete();
126+
}
127+
128+
@Test
129+
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
130+
final RuntimeException e = new RuntimeException();
131+
final AtomicBoolean added = new AtomicBoolean();
132+
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
133+
134+
boolean once = true;
135+
136+
@Override
137+
public void accept(Object o, Integer t) {
138+
if (once) {
139+
once = false;
140+
throw e;
141+
} else {
142+
added.set(true);
143+
}
144+
}
145+
};
146+
Burst.items(1, 2).create() //
147+
.collect(callableListCreator(), throwOnFirstOnly)//
148+
.test() //
149+
.assertError(e) //
150+
.assertNoValues() //
151+
.assertNotComplete();
152+
assertFalse(added.get());
153+
}
154+
155+
private static Consumer<Throwable> addToList(final List<Throwable> list) {
156+
return new Consumer<Throwable>() {
157+
158+
@Override
159+
public void accept(Throwable t) {
160+
list.add(t);
161+
}
162+
};
163+
}
164+
165+
private static <T> Callable<List<T>> callableListCreator() {
166+
return new Callable<List<T>>() {
167+
168+
@Override
169+
public List<T> call() {
170+
return new ArrayList<T>();
171+
}
172+
};
173+
}
174+
175+
private static <T> BiConsumer<Object, T> biConsumerThrows(final RuntimeException e) {
176+
return new BiConsumer<Object, T>() {
177+
178+
@Override
179+
public void accept(Object t1, T t2) {
180+
throw e;
181+
}
182+
};
183+
}
184+
185+
}

0 commit comments

Comments
 (0)