Skip to content

Commit 6dbeff4

Browse files
davidmotenakarnokd
authored andcommitted
2.x - collect - handle post-terminal events for Observable (#4428)
1 parent d37bb20 commit 6dbeff4

File tree

6 files changed

+292
-85
lines changed

6 files changed

+292
-85
lines changed

src/main/java/io/reactivex/internal/operators/observable/ObservableCollect.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.reactivex.disposables.Disposable;
1919
import io.reactivex.functions.BiConsumer;
2020
import io.reactivex.internal.disposables.*;
21+
import io.reactivex.plugins.RxJavaPlugins;
2122

2223
public final class ObservableCollect<T, U> extends AbstractObservableWithUpstream<T, U> {
2324
final Callable<? extends U> initialSupplier;
@@ -56,6 +57,8 @@ static final class CollectSubscriber<T, U> implements Observer<T>, Disposable {
5657

5758
Disposable s;
5859

60+
boolean done;
61+
5962
public CollectSubscriber(Observer<? super U> actual, U u, BiConsumer<? super U, ? super T> collector) {
6063
this.actual = actual;
6164
this.collector = collector;
@@ -84,21 +87,33 @@ public boolean isDisposed() {
8487

8588
@Override
8689
public void onNext(T t) {
90+
if (done) {
91+
return;
92+
}
8793
try {
8894
collector.accept(u, t);
8995
} catch (Throwable e) {
9096
s.dispose();
91-
actual.onError(e);
97+
onError(e);
9298
}
9399
}
94100

95101
@Override
96102
public void onError(Throwable t) {
103+
if (done) {
104+
RxJavaPlugins.onError(t);
105+
return;
106+
}
107+
done = true;
97108
actual.onError(t);
98109
}
99110

100111
@Override
101112
public void onComplete() {
113+
if (done) {
114+
return;
115+
}
116+
done = true;
102117
actual.onNext(u);
103118
actual.onComplete();
104119
}

src/test/java/io/reactivex/flowable/FlowableCollectTest.java

Lines changed: 4 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@
1313

1414
package io.reactivex.flowable;
1515

16+
import static io.reactivex.internal.util.TestingHelper.addToList;
17+
import static io.reactivex.internal.util.TestingHelper.biConsumerThrows;
18+
import static io.reactivex.internal.util.TestingHelper.callableListCreator;
1619
import static org.junit.Assert.assertEquals;
1720
import static org.junit.Assert.assertFalse;
1821

@@ -27,10 +30,9 @@
2730

2831
import io.reactivex.Flowable;
2932
import io.reactivex.functions.BiConsumer;
30-
import io.reactivex.functions.Consumer;
3133
import io.reactivex.plugins.RxJavaPlugins;
3234

33-
public class FlowableCollectTest {
35+
public final class FlowableCollectTest {
3436

3537
@Test
3638
public void testCollectToList() {
@@ -165,34 +167,4 @@ public void accept(Object o, Integer t) {
165167
assertFalse(added.get());
166168
}
167169

168-
private static Consumer<Throwable> addToList(final List<Throwable> list) {
169-
return new Consumer<Throwable>() {
170-
171-
@Override
172-
public void accept(Throwable t) {
173-
list.add(t);
174-
}
175-
};
176-
}
177-
178-
private static <T> Callable<List<T>> callableListCreator() {
179-
return new Callable<List<T>>() {
180-
181-
@Override
182-
public List<T> call() {
183-
return new ArrayList<T>();
184-
}
185-
};
186-
}
187-
188-
private static <T> BiConsumer<Object, T> biConsumerThrows(final RuntimeException e) {
189-
return new BiConsumer<Object, T>() {
190-
191-
@Override
192-
public void accept(Object t1, T t2) {
193-
throw e;
194-
}
195-
};
196-
}
197-
198170
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex.internal.operators.observable;
14+
15+
import java.util.Arrays;
16+
import java.util.List;
17+
18+
import io.reactivex.Observable;
19+
import io.reactivex.Observer;
20+
import io.reactivex.disposables.Disposables;
21+
22+
/**
23+
* Creates {@link Observable} of a number of items followed by either an error or
24+
* completion. Subscription status is not checked before emitting an event.
25+
*
26+
* @param <T> the value type
27+
*/
28+
public final class Burst<T> extends Observable<T> {
29+
30+
private final List<T> items;
31+
private final Throwable error;
32+
33+
private Burst(Throwable error, List<T> items) {
34+
this.error = error;
35+
this.items = items;
36+
}
37+
38+
@Override
39+
protected void subscribeActual(final Observer<? super T> observer) {
40+
observer.onSubscribe(Disposables.empty());
41+
for (T item: items) {
42+
observer.onNext(item);
43+
}
44+
if (error != null) {
45+
observer.onError(error);
46+
} else {
47+
observer.onComplete();
48+
}
49+
}
50+
51+
@SuppressWarnings("unchecked")
52+
public static <T> Builder<T> item(T item) {
53+
return items(item);
54+
}
55+
56+
public static <T> Builder<T> items(T... items) {
57+
return new Builder<T>(Arrays.asList(items));
58+
}
59+
60+
public static final class Builder<T> {
61+
62+
private final List<T> items;
63+
private Throwable error = null;
64+
65+
private Builder(List<T> items) {
66+
this.items = items;
67+
}
68+
69+
public Observable<T> error(Throwable e) {
70+
this.error = e;
71+
return create();
72+
}
73+
74+
public Observable<T> create() {
75+
return new Burst<T>(error, items);
76+
}
77+
78+
}
79+
80+
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package io.reactivex.internal.operators.observable;
2+
3+
import static io.reactivex.internal.util.TestingHelper.addToList;
4+
import static io.reactivex.internal.util.TestingHelper.biConsumerThrows;
5+
import static io.reactivex.internal.util.TestingHelper.callableListCreator;
6+
import static org.junit.Assert.assertEquals;
7+
import static org.junit.Assert.assertFalse;
8+
9+
import java.util.ArrayList;
10+
import java.util.Arrays;
11+
import java.util.List;
12+
import java.util.concurrent.Callable;
13+
import java.util.concurrent.CopyOnWriteArrayList;
14+
import java.util.concurrent.atomic.AtomicBoolean;
15+
16+
import org.junit.Test;
17+
18+
import io.reactivex.Observable;
19+
import io.reactivex.functions.BiConsumer;
20+
import io.reactivex.plugins.RxJavaPlugins;
21+
22+
public final class ObservableCollectTest {
23+
24+
@Test
25+
public void testCollectToList() {
26+
Observable<List<Integer>> o = Observable.just(1, 2, 3)
27+
.collect(new Callable<List<Integer>>() {
28+
@Override
29+
public List<Integer> call() {
30+
return new ArrayList<Integer>();
31+
}
32+
}, new BiConsumer<List<Integer>, Integer>() {
33+
@Override
34+
public void accept(List<Integer> list, Integer v) {
35+
list.add(v);
36+
}
37+
});
38+
39+
List<Integer> list = o.blockingLast();
40+
41+
assertEquals(3, list.size());
42+
assertEquals(1, list.get(0).intValue());
43+
assertEquals(2, list.get(1).intValue());
44+
assertEquals(3, list.get(2).intValue());
45+
46+
// test multiple subscribe
47+
List<Integer> list2 = o.blockingLast();
48+
49+
assertEquals(3, list2.size());
50+
assertEquals(1, list2.get(0).intValue());
51+
assertEquals(2, list2.get(1).intValue());
52+
assertEquals(3, list2.get(2).intValue());
53+
}
54+
55+
@Test
56+
public void testCollectToString() {
57+
String value = Observable.just(1, 2, 3).collect(new Callable<StringBuilder>() {
58+
@Override
59+
public StringBuilder call() {
60+
return new StringBuilder();
61+
}
62+
},
63+
new BiConsumer<StringBuilder, Integer>() {
64+
@Override
65+
public void accept(StringBuilder sb, Integer v) {
66+
if (sb.length() > 0) {
67+
sb.append("-");
68+
}
69+
sb.append(v);
70+
}
71+
}).blockingLast().toString();
72+
73+
assertEquals("1-2-3", value);
74+
}
75+
76+
@Test
77+
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
78+
try {
79+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
80+
RxJavaPlugins.setErrorHandler(addToList(list));
81+
final RuntimeException e1 = new RuntimeException();
82+
final RuntimeException e2 = new RuntimeException();
83+
84+
Burst.items(1).error(e2) //
85+
.collect(callableListCreator(), biConsumerThrows(e1)) //
86+
.test() //
87+
.assertError(e1) //
88+
.assertNotComplete();
89+
assertEquals(Arrays.asList(e2), list);
90+
} finally {
91+
RxJavaPlugins.reset();
92+
}
93+
}
94+
95+
@Test
96+
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
97+
final RuntimeException e = new RuntimeException();
98+
Burst.item(1).create() //
99+
.collect(callableListCreator(), biConsumerThrows(e)) //
100+
.test() //
101+
.assertError(e) //
102+
.assertNotComplete();
103+
}
104+
105+
@Test
106+
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
107+
final RuntimeException e = new RuntimeException();
108+
final AtomicBoolean added = new AtomicBoolean();
109+
BiConsumer<Object, Integer> throwOnFirstOnly = new BiConsumer<Object, Integer>() {
110+
111+
boolean once = true;
112+
113+
@Override
114+
public void accept(Object o, Integer t) {
115+
if (once) {
116+
once = false;
117+
throw e;
118+
} else {
119+
added.set(true);
120+
}
121+
}
122+
};
123+
Burst.items(1, 2).create() //
124+
.collect(callableListCreator(), throwOnFirstOnly)//
125+
.test() //
126+
.assertError(e) //
127+
.assertNoValues() //
128+
.assertNotComplete();
129+
assertFalse(added.get());
130+
}
131+
132+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package io.reactivex.internal.util;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.concurrent.Callable;
21+
22+
import io.reactivex.functions.BiConsumer;
23+
import io.reactivex.functions.Consumer;
24+
25+
public final class TestingHelper {
26+
27+
private TestingHelper() {
28+
// prevent instantiation
29+
}
30+
31+
public static <T> Consumer<T> addToList(final List<T> list) {
32+
return new Consumer<T>() {
33+
34+
@Override
35+
public void accept(T t) {
36+
list.add(t);
37+
}
38+
};
39+
}
40+
41+
public static <T> Callable<List<T>> callableListCreator() {
42+
return new Callable<List<T>>() {
43+
44+
@Override
45+
public List<T> call() {
46+
return new ArrayList<T>();
47+
}
48+
};
49+
}
50+
51+
public static BiConsumer<Object, Object> biConsumerThrows(final RuntimeException e) {
52+
return new BiConsumer<Object, Object>() {
53+
54+
@Override
55+
public void accept(Object t1, Object t2) {
56+
throw e;
57+
}
58+
};
59+
}
60+
}

0 commit comments

Comments
 (0)