Skip to content

Commit e697309

Browse files
davidmotenakarnokd
authored andcommitted
OperatorAll - prevent multiple terminal events (#4244)
1 parent 7e913fa commit e697309

File tree

2 files changed

+125
-2
lines changed

2 files changed

+125
-2
lines changed

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import rx.exceptions.Exceptions;
2121
import rx.functions.Func1;
2222
import rx.internal.producers.SingleDelayedProducer;
23+
import rx.plugins.RxJavaHooks;
2324

2425
/**
2526
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
@@ -43,14 +44,17 @@ public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
4344

4445
@Override
4546
public void onNext(T t) {
47+
if (done) {
48+
return;
49+
}
4650
Boolean result;
4751
try {
4852
result = predicate.call(t);
4953
} catch (Throwable e) {
5054
Exceptions.throwOrReport(e, this, t);
5155
return;
5256
}
53-
if (!result && !done) {
57+
if (!result) {
5458
done = true;
5559
producer.setValue(false);
5660
unsubscribe();
@@ -61,7 +65,12 @@ public void onNext(T t) {
6165

6266
@Override
6367
public void onError(Throwable e) {
64-
child.onError(e);
68+
if (!done) {
69+
done = true;
70+
child.onError(e);
71+
} else {
72+
RxJavaHooks.onError(e);
73+
}
6574
}
6675

6776
@Override

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,17 @@
2020

2121
import java.util.Arrays;
2222
import java.util.List;
23+
import java.util.concurrent.CopyOnWriteArrayList;
2324
import java.util.concurrent.TimeUnit;
2425

2526
import org.junit.Test;
2627

2728
import rx.*;
29+
import rx.Observable.OnSubscribe;
30+
import rx.functions.Action1;
2831
import rx.functions.Func1;
2932
import rx.observers.TestSubscriber;
33+
import rx.plugins.RxJavaHooks;
3034

3135
public class OperatorAllTest {
3236

@@ -178,4 +182,114 @@ public Boolean call(Object object) {
178182
assertEquals(ex, errors.get(0));
179183
assertTrue(ex.getCause().getMessage().contains("Boo!"));
180184
}
185+
186+
@Test
187+
public void testDoesNotEmitMultipleTerminalEvents() {
188+
TestSubscriber<Boolean> ts = TestSubscriber.create();
189+
Observable.create(new OnSubscribe<Integer>() {
190+
191+
@Override
192+
public void call(final Subscriber<? super Integer> sub) {
193+
sub.setProducer(new Producer() {
194+
195+
@Override
196+
public void request(long n) {
197+
if (n > 0) {
198+
sub.onNext(1);
199+
sub.onCompleted();
200+
}
201+
}
202+
});
203+
}
204+
})
205+
.all(new Func1<Integer,Boolean>() {
206+
207+
@Override
208+
public Boolean call(Integer t) {
209+
throw new RuntimeException("boo");
210+
}})
211+
.unsafeSubscribe(ts);
212+
ts.assertError(RuntimeException.class);
213+
ts.assertNotCompleted();
214+
}
215+
216+
@Test
217+
public void testUpstreamEmitsOnNextAfterFailureWithoutCheckingSubscription() {
218+
TestSubscriber<Boolean> ts = TestSubscriber.create();
219+
Observable.create(new OnSubscribe<Integer>() {
220+
221+
@Override
222+
public void call(final Subscriber<? super Integer> sub) {
223+
sub.setProducer(new Producer() {
224+
225+
@Override
226+
public void request(long n) {
227+
if (n > 1) {
228+
sub.onNext(1);
229+
sub.onNext(2);
230+
}
231+
}
232+
});
233+
}
234+
})
235+
.all(new Func1<Integer,Boolean>() {
236+
boolean once = true;
237+
@Override
238+
public Boolean call(Integer t) {
239+
if (once)
240+
throw new RuntimeException("boo");
241+
else {
242+
once = false;
243+
return true;
244+
}
245+
}})
246+
.unsafeSubscribe(ts);
247+
ts.assertNoValues();
248+
ts.assertError(RuntimeException.class);
249+
ts.assertNotCompleted();
250+
}
251+
252+
@Test
253+
public void testDoesNotEmitMultipleErrorEventsAndReportsSecondErrorToHooks() {
254+
try {
255+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
256+
RxJavaHooks.setOnError(new Action1<Throwable>() {
257+
258+
@Override
259+
public void call(Throwable t) {
260+
list.add(t);
261+
}
262+
});
263+
TestSubscriber<Boolean> ts = TestSubscriber.create();
264+
final RuntimeException e1 = new RuntimeException();
265+
final Throwable e2 = new RuntimeException();
266+
Observable.create(new OnSubscribe<Integer>() {
267+
268+
@Override
269+
public void call(final Subscriber<? super Integer> sub) {
270+
sub.setProducer(new Producer() {
271+
272+
@Override
273+
public void request(long n) {
274+
if (n > 0) {
275+
sub.onNext(1);
276+
sub.onError(e2);
277+
}
278+
}
279+
});
280+
}
281+
}).all(new Func1<Integer, Boolean>() {
282+
283+
@Override
284+
public Boolean call(Integer t) {
285+
throw e1;
286+
}
287+
}).unsafeSubscribe(ts);
288+
ts.assertNotCompleted();
289+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
290+
assertEquals(Arrays.asList(e2), list);
291+
} finally {
292+
RxJavaHooks.setOnError(null);
293+
}
294+
}
181295
}

0 commit comments

Comments
 (0)