Skip to content

Commit 2a50c2f

Browse files
davidmotenakarnokd
authored andcommitted
collect - prevent multiple terminal events (#4252)
1 parent 888560e commit 2a50c2f

File tree

7 files changed

+318
-60
lines changed

7 files changed

+318
-60
lines changed
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import rx.Subscriber;
20+
import rx.plugins.RxJavaHooks;
21+
22+
/**
23+
* Supplements {@code DeferredScalarSubscriber} with defensive behaviour that ensures no emissions
24+
* occur after a terminal event. If {@code onError} is called more than once then errors after the first
25+
* are reported to {@code RxJavaHooks.onError}.
26+
*
27+
* @param <T> source value type
28+
* @param <R> result value type
29+
*/
30+
public abstract class DeferredScalarSubscriberSafe<T, R> extends DeferredScalarSubscriber<T,R> {
31+
32+
protected boolean done;
33+
34+
public DeferredScalarSubscriberSafe(Subscriber<? super R> actual) {
35+
super(actual);
36+
}
37+
38+
@Override
39+
public void onError(Throwable ex) {
40+
if (!done) {
41+
done = true;
42+
super.onError(ex);
43+
} else {
44+
RxJavaHooks.onError(ex);
45+
}
46+
}
47+
48+
@Override
49+
public void onCompleted() {
50+
if (done) {
51+
return;
52+
}
53+
done = true;
54+
super.onCompleted();
55+
}
56+
57+
}

src/main/java/rx/internal/operators/OnSubscribeCollect.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void call(Subscriber<? super R> t) {
5050
new CollectSubscriber<T, R>(t, initialValue, collector).subscribeTo(source);
5151
}
5252

53-
static final class CollectSubscriber<T, R> extends DeferredScalarSubscriber<T, R> {
53+
static final class CollectSubscriber<T, R> extends DeferredScalarSubscriberSafe<T, R> {
5454

5555
final Action2<R, ? super T> collector;
5656

@@ -63,12 +63,15 @@ public CollectSubscriber(Subscriber<? super R> actual, R initialValue, Action2<R
6363

6464
@Override
6565
public void onNext(T t) {
66+
if (done) {
67+
return;
68+
}
6669
try {
6770
collector.call(value, t);
6871
} catch (Throwable ex) {
6972
Exceptions.throwIfFatal(ex);
7073
unsubscribe();
71-
actual.onError(ex);
74+
onError(ex);
7275
}
7376
}
7477

src/test/java/rx/ObservableTests.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -939,62 +939,7 @@ public void testRangeWithScheduler() {
939939
inOrder.verifyNoMoreInteractions();
940940
}
941941

942-
@Test
943-
public void testCollectToList() {
944-
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
945-
946-
@Override
947-
public List<Integer> call() {
948-
return new ArrayList<Integer>();
949-
}
950-
951-
}, new Action2<List<Integer>, Integer>() {
952-
953-
@Override
954-
public void call(List<Integer> list, Integer v) {
955-
list.add(v);
956-
}
957-
});
958942

959-
List<Integer> list = o.toBlocking().last();
960-
961-
assertEquals(3, list.size());
962-
assertEquals(1, list.get(0).intValue());
963-
assertEquals(2, list.get(1).intValue());
964-
assertEquals(3, list.get(2).intValue());
965-
966-
// test multiple subscribe
967-
List<Integer> list2 = o.toBlocking().last();
968-
969-
assertEquals(3, list2.size());
970-
assertEquals(1, list2.get(0).intValue());
971-
assertEquals(2, list2.get(1).intValue());
972-
assertEquals(3, list2.get(2).intValue());
973-
}
974-
975-
@Test
976-
public void testCollectToString() {
977-
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
978-
979-
@Override
980-
public StringBuilder call() {
981-
return new StringBuilder();
982-
}
983-
984-
}, new Action2<StringBuilder, Integer>() {
985-
986-
@Override
987-
public void call(StringBuilder sb, Integer v) {
988-
if (sb.length() > 0) {
989-
sb.append("-");
990-
}
991-
sb.append(v);
992-
}
993-
}).toBlocking().last().toString();
994-
995-
assertEquals("1-2-3", value);
996-
}
997-
998943
@Test
999944
public void testMergeWith() {
1000945
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

0 commit comments

Comments
 (0)