Skip to content

Commit b95cc5d

Browse files
committed
OperatorAny - prevent multiple terminal events
1 parent 0bdff66 commit b95cc5d

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,10 @@ public void onNext(T t) {
6464

6565
@Override
6666
public void onError(Throwable e) {
67-
child.onError(e);
67+
if (!done) {
68+
done = true;
69+
child.onError(e);
70+
}
6871
}
6972

7073
@Override

src/test/java/rx/internal/operators/OperatorAnyTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.junit.Test;
2626

2727
import rx.*;
28+
import rx.Observable.OnSubscribe;
2829
import rx.functions.Func1;
2930
import rx.internal.util.UtilityFunctions;
3031
import rx.observers.TestSubscriber;
@@ -270,4 +271,35 @@ public Boolean call(Object object) {
270271
assertEquals(ex, errors.get(0));
271272
assertTrue(ex.getCause().getMessage().contains("Boo!"));
272273
}
274+
275+
276+
@Test
277+
public void testDoesNotEmitMultipleTerminalEvents() {
278+
TestSubscriber<Boolean> ts = TestSubscriber.create();
279+
Observable.create(new OnSubscribe<Integer>() {
280+
281+
@Override
282+
public void call(final Subscriber<? super Integer> sub) {
283+
sub.setProducer(new Producer() {
284+
285+
@Override
286+
public void request(long n) {
287+
if (n > 0) {
288+
sub.onNext(1);
289+
sub.onCompleted();
290+
}
291+
}
292+
});
293+
}
294+
})
295+
.exists(new Func1<Integer,Boolean>() {
296+
297+
@Override
298+
public Boolean call(Integer t) {
299+
throw new RuntimeException("boo");
300+
}})
301+
.unsafeSubscribe(ts);
302+
ts.assertError(RuntimeException.class);
303+
ts.assertNotCompleted();
304+
}
273305
}

0 commit comments

Comments
 (0)