Skip to content

Commit 46f5cfc

Browse files
committed
implement backpressure for OperatorAll and include last value in cause if exception occurs
1 parent 96c903e commit 46f5cfc

File tree

2 files changed

+67
-5
lines changed

2 files changed

+67
-5
lines changed

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.Exceptions;
21+
import rx.exceptions.OnErrorThrowable;
2022
import rx.functions.Func1;
23+
import rx.internal.producers.SingleDelayedProducer;
2124

2225
/**
2326
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
@@ -34,16 +37,23 @@ public OperatorAll(Func1<? super T, Boolean> predicate) {
3437

3538
@Override
3639
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
40+
final SingleDelayedProducer<Boolean> producer = new SingleDelayedProducer<Boolean>(child);
3741
Subscriber<T> s = new Subscriber<T>() {
3842
boolean done;
3943

4044
@Override
4145
public void onNext(T t) {
42-
boolean result = predicate.call(t);
46+
Boolean result;
47+
try {
48+
result = predicate.call(t);
49+
} catch (Throwable e) {
50+
Exceptions.throwIfFatal(e);
51+
onError(OnErrorThrowable.addValueAsLastCause(e, t));
52+
return;
53+
}
4354
if (!result && !done) {
4455
done = true;
45-
child.onNext(false);
46-
child.onCompleted();
56+
producer.setValue(false);
4757
unsubscribe();
4858
} else {
4959
// if we drop values we must replace them upstream as downstream won't receive and request more
@@ -60,12 +70,12 @@ public void onError(Throwable e) {
6070
public void onCompleted() {
6171
if (!done) {
6272
done = true;
63-
child.onNext(true);
64-
child.onCompleted();
73+
producer.setValue(true);
6574
}
6675
}
6776
};
6877
child.add(s);
78+
child.setProducer(producer);
6979
return s;
7080
}
7181
}

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22+
import java.util.List;
2223
import java.util.concurrent.TimeUnit;
2324

25+
import org.apache.commons.math3.analysis.FunctionUtils;
2426
import org.junit.Test;
2527

2628
import rx.*;
29+
import rx.Observable.OnSubscribe;
2730
import rx.functions.Func1;
31+
import rx.observers.TestSubscriber;
2832

2933
public class OperatorAllTest {
3034

@@ -128,4 +132,52 @@ public Observable<Integer> call(Boolean t1) {
128132
});
129133
assertEquals((Object)2, source.toBlocking().first());
130134
}
135+
136+
@Test
137+
public void testBackpressureIfNoneRequestedNoneShouldBeDelivered() {
138+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(0);
139+
Observable.empty().all(new Func1<Object, Boolean>() {
140+
@Override
141+
public Boolean call(Object t1) {
142+
return false;
143+
}
144+
}).subscribe(ts);
145+
ts.assertNoValues();
146+
ts.assertNoErrors();
147+
ts.assertNotCompleted();
148+
}
149+
150+
@Test
151+
public void testBackpressureIfOneRequestedOneShouldBeDelivered() {
152+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(1);
153+
Observable.empty().all(new Func1<Object, Boolean>() {
154+
@Override
155+
public Boolean call(Object object) {
156+
return false;
157+
}
158+
}).subscribe(ts);
159+
ts.assertTerminalEvent();
160+
ts.assertNoErrors();
161+
ts.assertCompleted();
162+
ts.assertValue(true);
163+
}
164+
165+
@Test
166+
public void testPredicateThrowsExceptionAndValueInCauseMessage() {
167+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(0);
168+
final IllegalArgumentException ex = new IllegalArgumentException();
169+
Observable.just("Boo!").all(new Func1<Object, Boolean>() {
170+
@Override
171+
public Boolean call(Object object) {
172+
throw ex;
173+
}
174+
}).subscribe(ts);
175+
ts.assertTerminalEvent();
176+
ts.assertNoValues();
177+
ts.assertNotCompleted();
178+
List<Throwable> errors = ts.getOnErrorEvents();
179+
assertEquals(1, errors.size());
180+
assertEquals(ex, errors.get(0));
181+
assertTrue(ex.getCause().getMessage().contains("Boo!"));
182+
}
131183
}

0 commit comments

Comments
 (0)