Skip to content

Commit 142f31e

Browse files
committed
add backpressure support for OperatorAny and include last value in exception cause
1 parent ae09b86 commit 142f31e

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

src/main/java/rx/internal/operators/OperatorAny.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@
1919
import rx.Observable;
2020
import rx.Observable.Operator;
2121
import rx.Subscriber;
22+
import rx.exceptions.Exceptions;
23+
import rx.exceptions.OnErrorThrowable;
2224
import rx.functions.Func1;
25+
import rx.internal.producers.SingleDelayedProducer;
2326

2427
/**
2528
* Returns an {@link Observable} that emits <code>true</code> if any element of
@@ -36,23 +39,29 @@ public OperatorAny(Func1<? super T, Boolean> predicate, boolean returnOnEmpty) {
3639

3740
@Override
3841
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
42+
final SingleDelayedProducer<Boolean> producer = new SingleDelayedProducer<Boolean>(child);
3943
Subscriber<T> s = new Subscriber<T>() {
4044
boolean hasElements;
4145
boolean done;
4246

4347
@Override
4448
public void onNext(T t) {
4549
hasElements = true;
46-
boolean result = predicate.call(t);
50+
boolean result;
51+
try {
52+
result = predicate.call(t);
53+
} catch (Throwable e) {
54+
Exceptions.throwIfFatal(e);
55+
onError(OnErrorThrowable.addValueAsLastCause(e, t));
56+
return;
57+
}
4758
if (result && !done) {
4859
done = true;
49-
child.onNext(!returnOnEmpty);
50-
child.onCompleted();
60+
producer.setValue(!returnOnEmpty);
5161
unsubscribe();
52-
} else {
53-
// if we drop values we must replace them upstream as downstream won't receive and request more
54-
request(1);
55-
}
62+
}
63+
// note that don't need to request more of upstream because this subscriber
64+
// defaults to requesting Long.MAX_VALUE
5665
}
5766

5867
@Override
@@ -65,16 +74,16 @@ public void onCompleted() {
6574
if (!done) {
6675
done = true;
6776
if (hasElements) {
68-
child.onNext(false);
77+
producer.setValue(false);
6978
} else {
70-
child.onNext(returnOnEmpty);
79+
producer.setValue(returnOnEmpty);
7180
}
72-
child.onCompleted();
7381
}
7482
}
7583

7684
};
7785
child.add(s);
86+
child.setProducer(producer);
7887
return s;
7988
}
8089
}

src/test/java/rx/internal/operators/OperatorAnyTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22+
import java.util.List;
2223
import java.util.concurrent.TimeUnit;
2324

2425
import org.junit.Test;
2526

2627
import rx.*;
2728
import rx.functions.Func1;
2829
import rx.internal.util.UtilityFunctions;
30+
import rx.observers.TestSubscriber;
2931

3032
public class OperatorAnyTest {
3133

@@ -220,4 +222,52 @@ public Observable<Integer> call(Boolean t1) {
220222
});
221223
assertEquals((Object)2, source.toBlocking().first());
222224
}
225+
226+
@Test
227+
public void testBackpressureIfNoneRequestedNoneShouldBeDelivered() {
228+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(0);
229+
Observable.just(1).exists(new Func1<Object, Boolean>() {
230+
@Override
231+
public Boolean call(Object t1) {
232+
return true;
233+
}
234+
}).subscribe(ts);
235+
ts.assertNoValues();
236+
ts.assertNoErrors();
237+
ts.assertNotCompleted();
238+
}
239+
240+
@Test
241+
public void testBackpressureIfOneRequestedOneShouldBeDelivered() {
242+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(1);
243+
Observable.just(1).exists(new Func1<Object, Boolean>() {
244+
@Override
245+
public Boolean call(Object object) {
246+
return true;
247+
}
248+
}).subscribe(ts);
249+
ts.assertTerminalEvent();
250+
ts.assertNoErrors();
251+
ts.assertCompleted();
252+
ts.assertValue(true);
253+
}
254+
255+
@Test
256+
public void testPredicateThrowsExceptionAndValueInCauseMessage() {
257+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(0);
258+
final IllegalArgumentException ex = new IllegalArgumentException();
259+
Observable.just("Boo!").exists(new Func1<Object, Boolean>() {
260+
@Override
261+
public Boolean call(Object object) {
262+
throw ex;
263+
}
264+
}).subscribe(ts);
265+
ts.assertTerminalEvent();
266+
ts.assertNoValues();
267+
ts.assertNotCompleted();
268+
List<Throwable> errors = ts.getOnErrorEvents();
269+
assertEquals(1, errors.size());
270+
assertEquals(ex, errors.get(0));
271+
assertTrue(ex.getCause().getMessage().contains("Boo!"));
272+
}
223273
}

0 commit comments

Comments
 (0)