Skip to content

Commit 689b22d

Browse files
davidmotenakarnokd
authored andcommitted
OperatorAny - prevent multiple terminal events (#4245)
1 parent b72beff commit 689b22d

File tree

2 files changed

+126
-2
lines changed

2 files changed

+126
-2
lines changed

src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.exceptions.Exceptions;
2222
import rx.functions.Func1;
2323
import rx.internal.producers.SingleDelayedProducer;
24+
import rx.plugins.RxJavaHooks;
2425

2526
/**
2627
* Returns an {@link Observable} that emits <code>true</code> if any element of
@@ -45,6 +46,9 @@ public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
4546

4647
@Override
4748
public void onNext(T t) {
49+
if (done) {
50+
return;
51+
}
4852
hasElements = true;
4953
boolean result;
5054
try {
@@ -53,7 +57,7 @@ public void onNext(T t) {
5357
Exceptions.throwOrReport(e, this, t);
5458
return;
5559
}
56-
if (result && !done) {
60+
if (result) {
5761
done = true;
5862
producer.setValue(!returnOnEmpty);
5963
unsubscribe();
@@ -64,7 +68,12 @@ public void onNext(T t) {
6468

6569
@Override
6670
public void onError(Throwable e) {
67-
child.onError(e);
71+
if (!done) {
72+
done = true;
73+
child.onError(e);
74+
} else {
75+
RxJavaHooks.onError(e);
76+
}
6877
}
6978

7079
@Override

src/test/java/rx/internal/operators/OperatorAnyTest.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,18 @@
2020

2121
import java.util.Arrays;
2222
import java.util.List;
23+
import java.util.concurrent.CopyOnWriteArrayList;
2324
import java.util.concurrent.TimeUnit;
2425

2526
import org.junit.Test;
2627

2728
import rx.*;
29+
import rx.Observable.OnSubscribe;
30+
import rx.functions.Action1;
2831
import rx.functions.Func1;
2932
import rx.internal.util.UtilityFunctions;
3033
import rx.observers.TestSubscriber;
34+
import rx.plugins.RxJavaHooks;
3135

3236
public class OperatorAnyTest {
3337

@@ -270,4 +274,115 @@ public Boolean call(Object object) {
270274
assertEquals(ex, errors.get(0));
271275
assertTrue(ex.getCause().getMessage().contains("Boo!"));
272276
}
277+
278+
@Test
279+
public void testUpstreamEmitsOnNextAfterFailureWithoutCheckingSubscription() {
280+
TestSubscriber<Boolean> ts = TestSubscriber.create();
281+
Observable.create(new OnSubscribe<Integer>() {
282+
283+
@Override
284+
public void call(final Subscriber<? super Integer> sub) {
285+
sub.setProducer(new Producer() {
286+
287+
@Override
288+
public void request(long n) {
289+
if (n > 1) {
290+
sub.onNext(1);
291+
sub.onNext(2);
292+
}
293+
}
294+
});
295+
}
296+
})
297+
.exists(new Func1<Integer,Boolean>() {
298+
boolean once = true;
299+
@Override
300+
public Boolean call(Integer t) {
301+
if (once)
302+
throw new RuntimeException("boo");
303+
else {
304+
once = false;
305+
return true;
306+
}
307+
}})
308+
.unsafeSubscribe(ts);
309+
ts.assertNoValues();
310+
ts.assertError(RuntimeException.class);
311+
ts.assertNotCompleted();
312+
}
313+
314+
@Test
315+
public void testUpstreamEmitsOnNextWithoutCheckingSubscription() {
316+
TestSubscriber<Boolean> ts = TestSubscriber.create();
317+
Observable.create(new OnSubscribe<Integer>() {
318+
319+
@Override
320+
public void call(final Subscriber<? super Integer> sub) {
321+
sub.setProducer(new Producer() {
322+
323+
@Override
324+
public void request(long n) {
325+
if (n > 1) {
326+
sub.onNext(1);
327+
sub.onNext(2);
328+
sub.onCompleted();
329+
}
330+
}
331+
});
332+
}
333+
})
334+
.exists(new Func1<Integer,Boolean>() {
335+
@Override
336+
public Boolean call(Integer t) {
337+
return true;
338+
}})
339+
.unsafeSubscribe(ts);
340+
ts.assertValue(true);
341+
assertEquals(1, ts.getCompletions());
342+
ts.assertNoErrors();
343+
}
344+
345+
@Test
346+
public void testDoesNotEmitMultipleErrorEventsAndReportsSecondErrorToHooks() {
347+
try {
348+
final List<Throwable> list = new CopyOnWriteArrayList<Throwable>();
349+
RxJavaHooks.setOnError(new Action1<Throwable>() {
350+
351+
@Override
352+
public void call(Throwable t) {
353+
list.add(t);
354+
}
355+
});
356+
TestSubscriber<Boolean> ts = TestSubscriber.create();
357+
final RuntimeException e1 = new RuntimeException();
358+
final Throwable e2 = new RuntimeException();
359+
Observable.create(new OnSubscribe<Integer>() {
360+
361+
@Override
362+
public void call(final Subscriber<? super Integer> sub) {
363+
sub.setProducer(new Producer() {
364+
365+
@Override
366+
public void request(long n) {
367+
if (n > 0) {
368+
sub.onNext(1);
369+
sub.onError(e2);
370+
}
371+
}
372+
});
373+
}
374+
}).exists(new Func1<Integer, Boolean>() {
375+
376+
@Override
377+
public Boolean call(Integer t) {
378+
throw e1;
379+
}
380+
}).unsafeSubscribe(ts);
381+
ts.assertNotCompleted();
382+
assertEquals(Arrays.asList(e1), ts.getOnErrorEvents());
383+
assertEquals(Arrays.asList(e2), list);
384+
} finally {
385+
RxJavaHooks.setOnError(null);
386+
}
387+
}
273388
}

0 commit comments

Comments
 (0)