Skip to content

Commit da88837

Browse files
davidmotenakarnokd
authored andcommitted
2.x collect - handle post-terminal events properly (#4364)
1 parent 7a1a4af commit da88837

File tree

4 files changed

+326
-54
lines changed

4 files changed

+326
-54
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableCollect.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.reactivex.exceptions.Exceptions;
2020
import io.reactivex.functions.BiConsumer;
2121
import io.reactivex.internal.subscriptions.*;
22+
import io.reactivex.plugins.RxJavaPlugins;
2223

2324
public final class FlowableCollect<T, U> extends AbstractFlowableWithUpstream<T, U> {
2425

@@ -58,6 +59,8 @@ static final class CollectSubscriber<T, U> extends DeferredScalarSubscription<U>
5859

5960
Subscription s;
6061

62+
boolean done;
63+
6164
public CollectSubscriber(Subscriber<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
6265
super(actual);
6366
this.collector = collector;
@@ -75,22 +78,34 @@ public void onSubscribe(Subscription s) {
7578

7679
@Override
7780
public void onNext(T t) {
81+
if (done) {
82+
return;
83+
}
7884
try {
7985
collector.accept(u, t);
8086
} catch (Throwable e) {
8187
Exceptions.throwIfFatal(e);
8288
s.cancel();
83-
actual.onError(e);
89+
onError(e);
8490
}
8591
}
8692

8793
@Override
8894
public void onError(Throwable t) {
95+
if (done) {
96+
RxJavaPlugins.onError(t);
97+
return;
98+
}
99+
done = true;
89100
actual.onError(t);
90101
}
91102

92103
@Override
93104
public void onComplete() {
105+
if (done) {
106+
return;
107+
}
108+
done = true;
94109
complete(u);
95110
}
96111

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.flowable;
14+
15+
import java.util.Arrays;
16+
import java.util.List;
17+
import java.util.Queue;
18+
import java.util.concurrent.ConcurrentLinkedQueue;
19+
import java.util.concurrent.atomic.AtomicLong;
20+
21+
import org.reactivestreams.Subscriber;
22+
import org.reactivestreams.Subscription;
23+
24+
import io.reactivex.Flowable;
25+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
26+
import io.reactivex.internal.util.BackpressureHelper;
27+
28+
/**
29+
* Creates {@link Flowable} of a number of items followed by either an error or
30+
* completion. Cancellation has no effect on preventing emissions until the
31+
* currently outstanding requests have been met.
32+
*/
33+
public final class Burst<T> extends Flowable<T> {
34+
35+
private final List<T> items;
36+
private final Throwable error;
37+
38+
private Burst(Throwable error, List<T> items) {
39+
if (items.isEmpty()) {
40+
throw new IllegalArgumentException("items cannot be empty");
41+
}
42+
for (T item : items) {
43+
if (item == null) {
44+
throw new IllegalArgumentException("items cannot include null");
45+
}
46+
}
47+
this.error = error;
48+
this.items = items;
49+
}
50+
51+
@Override
52+
protected void subscribeActual(final Subscriber<? super T> subscriber) {
53+
subscriber.onSubscribe(new Subscription() {
54+
55+
final Queue<T> q = new ConcurrentLinkedQueue<T>(items);
56+
final AtomicLong requested = new AtomicLong();
57+
volatile boolean cancelled = false;
58+
59+
@Override
60+
public void request(long n) {
61+
if (cancelled) {
62+
// required by reactive-streams-jvm 3.6
63+
return;
64+
}
65+
if (SubscriptionHelper.validate(n)) {
66+
// just for testing, don't care about perf
67+
// so no attempt made to reduce volatile reads
68+
if (BackpressureHelper.add(requested, n) == 0) {
69+
if (q.isEmpty())
70+
return;
71+
while (!q.isEmpty() && requested.get() > 0) {
72+
T item = q.poll();
73+
requested.decrementAndGet();
74+
subscriber.onNext(item);
75+
}
76+
if (q.isEmpty()) {
77+
if (error != null) {
78+
subscriber.onError(error);
79+
} else {
80+
subscriber.onComplete();
81+
}
82+
}
83+
}
84+
}
85+
}
86+
87+
@Override
88+
public void cancel() {
89+
cancelled = true;
90+
}
91+
});
92+
93+
}
94+
95+
@SuppressWarnings("unchecked")
96+
public static <T> Builder<T> item(T item) {
97+
return items(item);
98+
}
99+
100+
public static <T> Builder<T> items(T... items) {
101+
return new Builder<T>(Arrays.asList(items));
102+
}
103+
104+
public static final class Builder<T> {
105+
106+
private final List<T> items;
107+
private Throwable error = null;
108+
109+
private Builder(List<T> items) {
110+
this.items = items;
111+
}
112+
113+
public Flowable<T> error(Throwable e) {
114+
this.error = e;
115+
return create();
116+
}
117+
118+
public Flowable<T> create() {
119+
return new Burst<T>(error, items);
120+
}
121+
122+
}
123+
124+
}
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package io.reactivex.flowable;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertFalse;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.List;
9+
import java.util.concurrent.Callable;
10+
import java.util.concurrent.CopyOnWriteArrayList;
11+
import java.util.concurrent.atomic.AtomicBoolean;
12+
13+
import org.junit.Test;
14+
15+
import io.reactivex.Flowable;
16+
import io.reactivex.functions.BiConsumer;
17+
import io.reactivex.functions.Consumer;
18+
import io.reactivex.plugins.RxJavaPlugins;
19+
20+
public class FlowableCollectTest {
21+
22+
@Test
23+
public void testCollectToList() {
24+
Flowable<List<Integer>> o = Flowable.just(1, 2, 3)
25+
.collect(new Callable<List<Integer>>() {
26+
@Override
27+
public List<Integer> call() {
28+
return new ArrayList<Integer>();
29+
}
30+
}, new BiConsumer<List<Integer>, Integer>() {
31+
@Override
32+
public void accept(List<Integer> list, Integer v) {
33+
list.add(v);
34+
}
35+
});
36+
37+
List<Integer> list = o.blockingLast();
38+
39+
assertEquals(3, list.size());
40+
assertEquals(1, list.get(0).intValue());
41+
assertEquals(2, list.get(1).intValue());
42+
assertEquals(3, list.get(2).intValue());
43+
44+
// test multiple subscribe
45+
List<Integer> list2 = o.blockingLast();
46+
47+
assertEquals(3, list2.size());
48+
assertEquals(1, list2.get(0).intValue());
49+
assertEquals(2, list2.get(1).intValue());
50+
assertEquals(3, list2.get(2).intValue());
51+
}
52+
53+
@Test
54+
public void testCollectToString() {
55+
String value = Flowable.just(1, 2, 3)
56+
.collect(
57+
new Callable<StringBuilder>() {
58+
@Override
59+
public StringBuilder call() {
60+
return new StringBuilder();
61+
}
62+
},
63+
new BiConsumer<StringBuilder, Integer>() {
64+
@Override
65+
public void accept(StringBuilder sb, Integer v) {
66+
if (sb.length() > 0) {
67+
sb.append("-");
68+
}
69+
sb.append(v);
70+
}
71+
}).blockingLast().toString();
72+
73+
assertEquals("1-2-3", value);
74+
}
75+
76+
77+
@Test
78+
public void testFactoryFailureResultsInErrorEmission() {
79+
final RuntimeException e = new RuntimeException();
80+
Flowable.just(1).collect(new Callable<List<Integer>>() {
81+
82+
@Override
83+
public List<Integer> call() throws Exception {
84+
throw e;
85+
}
86+
}, new BiConsumer<List<Integer>, Integer>() {
87+
88+
@Override
89+
public void accept(List<Integer> list, Integer t) {
90+
list.add(t);
91+
}
92+
})
93+
.test()
94+
.assertNoValues()
95+
.assertError(e)
96+
.assertNotComplete();
97+
}
98+
99+
@Test
100+
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
101+
try {
102+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
103+
RxJavaPlugins.setErrorHandler(addToList(list));
104+
final RuntimeException e1 = new RuntimeException();
105+
final RuntimeException e2 = new RuntimeException();
106+
107+
Burst.items(1).error(e2) //
108+
.collect(callableListCreator(), biConsumerThrows(e1)) //
109+
.test() //
110+
.assertError(e1) //
111+
.assertNotComplete();
112+
assertEquals(Arrays.asList(e2), list);
113+
} finally {
114+
RxJavaPlugins.reset();
115+
}
116+
}
117+
118+
@Test
119+
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
120+
final RuntimeException e = new RuntimeException();
121+
Burst.item(1).create() //
122+
.collect(callableListCreator(), biConsumerThrows(e)) //
123+
.test() //
124+
.assertError(e) //
125+
.assertNotComplete();
126+
}
127+
128+
@Test
129+
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
130+
final RuntimeException e = new RuntimeException();
131+
final AtomicBoolean added = new AtomicBoolean();
132+
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
133+
134+
boolean once = true;
135+
136+
@Override
137+
public void accept(Object o, Integer t) {
138+
if (once) {
139+
once = false;
140+
throw e;
141+
} else {
142+
added.set(true);
143+
}
144+
}
145+
};
146+
Burst.items(1, 2).create() //
147+
.collect(callableListCreator(), throwOnFirstOnly)//
148+
.test() //
149+
.assertError(e) //
150+
.assertNoValues() //
151+
.assertNotComplete();
152+
assertFalse(added.get());
153+
}
154+
155+
private static Consumer<Throwable> addToList(final List<Throwable> list) {
156+
return new Consumer<Throwable>() {
157+
158+
@Override
159+
public void accept(Throwable t) {
160+
list.add(t);
161+
}
162+
};
163+
}
164+
165+
private static <T> Callable<List<T>> callableListCreator() {
166+
return new Callable<List<T>>() {
167+
168+
@Override
169+
public List<T> call() {
170+
return new ArrayList<T>();
171+
}
172+
};
173+
}
174+
175+
private static <T> BiConsumer<Object, T> biConsumerThrows(final RuntimeException e) {
176+
return new BiConsumer<Object, T>() {
177+
178+
@Override
179+
public void accept(Object t1, T t2) {
180+
throw e;
181+
}
182+
};
183+
}
184+
185+
}

0 commit comments

Comments
 (0)