Skip to content

Commit 3b5dd7e

Browse files
committed
collect - prevent multiple terminal events
1 parent 888560e commit 3b5dd7e

File tree

7 files changed

+275
-61
lines changed

7 files changed

+275
-61
lines changed

src/main/java/rx/internal/operators/DeferredScalarSubscriber.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.concurrent.atomic.AtomicInteger;
2020

2121
import rx.*;
22+
import rx.plugins.RxJavaHooks;
2223

2324
/**
2425
* Base class for Subscribers that consume the entire upstream and signal
@@ -56,19 +57,31 @@ public abstract class DeferredScalarSubscriber<T, R> extends Subscriber<T> {
5657
/** Value will be emitted. */
5758
static final int HAS_REQUEST_HAS_VALUE = 3;
5859

60+
/** prevents multiple terminal emissions. */
61+
protected boolean done;
62+
5963
public DeferredScalarSubscriber(Subscriber<? super R> actual) {
6064
this.actual = actual;
6165
this.state = new AtomicInteger();
6266
}
6367

6468
@Override
6569
public void onError(Throwable ex) {
66-
value = null;
67-
actual.onError(ex);
70+
if (!done) {
71+
done = true;
72+
value = null;
73+
actual.onError(ex);
74+
} else {
75+
RxJavaHooks.onError(ex);
76+
}
6877
}
6978

7079
@Override
7180
public void onCompleted() {
81+
if (done) {
82+
return;
83+
}
84+
done = true;
7285
if (hasValue) {
7386
complete(value);
7487
} else {

src/main/java/rx/internal/operators/OnSubscribeCollect.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,15 @@ public CollectSubscriber(Subscriber<? super R> actual, R initialValue, Action2<R
6363

6464
@Override
6565
public void onNext(T t) {
66+
if (done) {
67+
return;
68+
}
6669
try {
6770
collector.call(value, t);
6871
} catch (Throwable ex) {
6972
Exceptions.throwIfFatal(ex);
7073
unsubscribe();
71-
actual.onError(ex);
74+
onError(ex);
7275
}
7376
}
7477

src/test/java/rx/ObservableTests.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -939,62 +939,7 @@ public void testRangeWithScheduler() {
939939
inOrder.verifyNoMoreInteractions();
940940
}
941941

942-
@Test
943-
public void testCollectToList() {
944-
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
945-
946-
@Override
947-
public List<Integer> call() {
948-
return new ArrayList<Integer>();
949-
}
950-
951-
}, new Action2<List<Integer>, Integer>() {
952-
953-
@Override
954-
public void call(List<Integer> list, Integer v) {
955-
list.add(v);
956-
}
957-
});
958942

959-
List<Integer> list = o.toBlocking().last();
960-
961-
assertEquals(3, list.size());
962-
assertEquals(1, list.get(0).intValue());
963-
assertEquals(2, list.get(1).intValue());
964-
assertEquals(3, list.get(2).intValue());
965-
966-
// test multiple subscribe
967-
List<Integer> list2 = o.toBlocking().last();
968-
969-
assertEquals(3, list2.size());
970-
assertEquals(1, list2.get(0).intValue());
971-
assertEquals(2, list2.get(1).intValue());
972-
assertEquals(3, list2.get(2).intValue());
973-
}
974-
975-
@Test
976-
public void testCollectToString() {
977-
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
978-
979-
@Override
980-
public StringBuilder call() {
981-
return new StringBuilder();
982-
}
983-
984-
}, new Action2<StringBuilder, Integer>() {
985-
986-
@Override
987-
public void call(StringBuilder sb, Integer v) {
988-
if (sb.length() > 0) {
989-
sb.append("-");
990-
}
991-
sb.append(v);
992-
}
993-
}).toBlocking().last().toString();
994-
995-
assertEquals("1-2-3", value);
996-
}
997-
998943
@Test
999944
public void testMergeWith() {
1000945
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
20+
21+
import java.util.ArrayList;
22+
import java.util.Arrays;
23+
import java.util.List;
24+
import java.util.concurrent.CopyOnWriteArrayList;
25+
import java.util.concurrent.atomic.AtomicBoolean;
26+
27+
import org.junit.Assert;
28+
import org.junit.Test;
29+
30+
import rx.Observable;
31+
import rx.Producer;
32+
import rx.Subscriber;
33+
import rx.Observable.OnSubscribe;
34+
import rx.functions.Action1;
35+
import rx.functions.Action2;
36+
import rx.functions.Func0;
37+
import rx.observers.TestSubscriber;
38+
import rx.plugins.RxJavaHooks;
39+
40+
public class OnSubscribeCollectTest {
41+
42+
@Test
43+
public void testCollectToList() {
44+
Observable<List<Integer>> o = Observable.just(1, 2, 3).collect(new Func0<List<Integer>>() {
45+
46+
@Override
47+
public List<Integer> call() {
48+
return new ArrayList<Integer>();
49+
}
50+
51+
}, new Action2<List<Integer>, Integer>() {
52+
53+
@Override
54+
public void call(List<Integer> list, Integer v) {
55+
list.add(v);
56+
}
57+
});
58+
59+
List<Integer> list = o.toBlocking().last();
60+
61+
assertEquals(3, list.size());
62+
assertEquals(1, list.get(0).intValue());
63+
assertEquals(2, list.get(1).intValue());
64+
assertEquals(3, list.get(2).intValue());
65+
66+
// test multiple subscribe
67+
List<Integer> list2 = o.toBlocking().last();
68+
69+
assertEquals(3, list2.size());
70+
assertEquals(1, list2.get(0).intValue());
71+
assertEquals(2, list2.get(1).intValue());
72+
assertEquals(3, list2.get(2).intValue());
73+
}
74+
75+
@Test
76+
public void testCollectToString() {
77+
String value = Observable.just(1, 2, 3).collect(new Func0<StringBuilder>() {
78+
79+
@Override
80+
public StringBuilder call() {
81+
return new StringBuilder();
82+
}
83+
84+
}, new Action2<StringBuilder, Integer>() {
85+
86+
@Override
87+
public void call(StringBuilder sb, Integer v) {
88+
if (sb.length() > 0) {
89+
sb.append("-");
90+
}
91+
sb.append(v);
92+
}
93+
}).toBlocking().last().toString();
94+
95+
assertEquals("1-2-3", value);
96+
}
97+
98+
@Test
99+
public void testFactoryFailureResultsInErrorEmission() {
100+
TestSubscriber<Object> ts = TestSubscriber.create();
101+
final RuntimeException e = new RuntimeException();
102+
Observable.just(1).collect(new Func0<List<Integer>>() {
103+
104+
@Override
105+
public List<Integer> call() {
106+
throw e;
107+
}
108+
}, new Action2<List<Integer>, Integer>() {
109+
110+
@Override
111+
public void call(List<Integer> list, Integer t) {
112+
list.add(t);
113+
}
114+
}).subscribe(ts);
115+
ts.assertNoValues();
116+
ts.assertError(e);
117+
ts.assertNotCompleted();
118+
}
119+
120+
@Test
121+
public void testCollectorFailureDoesNotResultInTwoErrorEmissions() {
122+
try {
123+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
124+
RxJavaHooks.setOnError(new Action1<Throwable>() {
125+
126+
@Override
127+
public void call(Throwable t) {
128+
list.add(t);
129+
}
130+
});
131+
final RuntimeException e1 = new RuntimeException();
132+
final RuntimeException e2 = new RuntimeException();
133+
TestSubscriber<List<Integer>> ts = TestSubscriber.create();
134+
Observable.create(new OnSubscribe<Integer>() {
135+
136+
@Override
137+
public void call(final Subscriber<? super Integer> sub) {
138+
sub.setProducer(new Producer() {
139+
140+
@Override
141+
public void request(long n) {
142+
if (n > 0) {
143+
sub.onNext(1);
144+
sub.onError(e2);
145+
}
146+
}
147+
});
148+
}
149+
}).collect(new Func0<List<Integer>>() {
150+
151+
@Override
152+
public List<Integer> call() {
153+
return new ArrayList<Integer>();
154+
}
155+
}, //
156+
new Action2<List<Integer>, Integer>() {
157+
158+
@Override
159+
public void call(List<Integer> t1, Integer t2) {
160+
throw e1;
161+
}
162+
}).unsafeSubscribe(ts);
163+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
164+
ts.assertNotCompleted();
165+
assertEquals(Arrays.asList(e2), list);
166+
} finally {
167+
RxJavaHooks.reset();
168+
}
169+
}
170+
171+
@Test
172+
public void testCollectorFailureDoesNotResultInErrorAndCompletedEmissions() {
173+
final RuntimeException e1 = new RuntimeException();
174+
TestSubscriber<List<Integer>> ts = TestSubscriber.create();
175+
Observable.create(new OnSubscribe<Integer>() {
176+
177+
@Override
178+
public void call(final Subscriber<? super Integer> sub) {
179+
sub.setProducer(new Producer() {
180+
181+
@Override
182+
public void request(long n) {
183+
if (n > 0) {
184+
sub.onNext(1);
185+
sub.onCompleted();
186+
}
187+
}
188+
});
189+
}
190+
}).collect(new Func0<List<Integer>>() {
191+
192+
@Override
193+
public List<Integer> call() {
194+
return new ArrayList<Integer>();
195+
}
196+
}, //
197+
new Action2<List<Integer>, Integer>() {
198+
199+
@Override
200+
public void call(List<Integer> t1, Integer t2) {
201+
throw e1;
202+
}
203+
}).unsafeSubscribe(ts);
204+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
205+
ts.assertNotCompleted();
206+
}
207+
208+
@Test
209+
public void testCollectorFailureDoesNotResultInErrorAndOnNextEmissions() {
210+
final RuntimeException e1 = new RuntimeException();
211+
TestSubscriber<List<Integer>> ts = TestSubscriber.create();
212+
final AtomicBoolean added = new AtomicBoolean();
213+
Observable.create(new OnSubscribe<Integer>() {
214+
215+
@Override
216+
public void call(final Subscriber<? super Integer> sub) {
217+
sub.setProducer(new Producer() {
218+
219+
@Override
220+
public void request(long n) {
221+
if (n > 0) {
222+
sub.onNext(1);
223+
sub.onNext(2);
224+
}
225+
}
226+
});
227+
}
228+
}).collect(new Func0<List<Integer>>() {
229+
230+
@Override
231+
public List<Integer> call() {
232+
return new ArrayList<Integer>();
233+
}
234+
}, //
235+
new Action2<List<Integer>, Integer>() {
236+
boolean once = true;
237+
@Override
238+
public void call(List<Integer> list, Integer t) {
239+
if (once) {
240+
once = false;
241+
throw e1;
242+
} else {
243+
added.set(true);
244+
}
245+
}
246+
}).unsafeSubscribe(ts);
247+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
248+
ts.assertNoValues();
249+
ts.assertNotCompleted();
250+
assertFalse(added.get());
251+
}
252+
253+
}

src/test/java/rx/internal/operators/OnSubscribeReduceTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public Integer call(Integer a, Integer b) {
255255
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
256256
assertEquals(Arrays.asList(e2), list);
257257
} finally {
258-
RxJavaHooks.setOnError(null);
258+
RxJavaHooks.reset();
259259
}
260260
}
261261

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public Boolean call(Integer t) {
289289
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
290290
assertEquals(Arrays.asList(e2), list);
291291
} finally {
292-
RxJavaHooks.setOnError(null);
292+
RxJavaHooks.reset();
293293
}
294294
}
295295
}

0 commit comments

Comments
 (0)