Skip to content

Commit 6bb5539

Browse files
committed
Merge pull request #2986 from davidmoten/all-backp
OperatorAll - implement backpressure and include last value in exception cause
2 parents 96c903e + 3faea61 commit 6bb5539

File tree

2 files changed

+68
-9
lines changed

2 files changed

+68
-9
lines changed

src/main/java/rx/internal/operators/OperatorAll.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
20+
import rx.exceptions.Exceptions;
21+
import rx.exceptions.OnErrorThrowable;
2022
import rx.functions.Func1;
23+
import rx.internal.producers.SingleDelayedProducer;
2124

2225
/**
2326
* Returns an Observable that emits a Boolean that indicates whether all items emitted by an
@@ -34,21 +37,27 @@ public OperatorAll(Func1<? super T, Boolean> predicate) {
3437

3538
@Override
3639
public Subscriber<? super T> call(final Subscriber<? super Boolean> child) {
40+
final SingleDelayedProducer<Boolean> producer = new SingleDelayedProducer<Boolean>(child);
3741
Subscriber<T> s = new Subscriber<T>() {
3842
boolean done;
3943

4044
@Override
4145
public void onNext(T t) {
42-
boolean result = predicate.call(t);
46+
Boolean result;
47+
try {
48+
result = predicate.call(t);
49+
} catch (Throwable e) {
50+
Exceptions.throwIfFatal(e);
51+
onError(OnErrorThrowable.addValueAsLastCause(e, t));
52+
return;
53+
}
4354
if (!result && !done) {
4455
done = true;
45-
child.onNext(false);
46-
child.onCompleted();
56+
producer.setValue(false);
4757
unsubscribe();
48-
} else {
49-
// if we drop values we must replace them upstream as downstream won't receive and request more
50-
request(1);
51-
}
58+
}
59+
// note that don't need to request more of upstream because this subscriber
60+
// defaults to requesting Long.MAX_VALUE
5261
}
5362

5463
@Override
@@ -60,12 +69,12 @@ public void onError(Throwable e) {
6069
public void onCompleted() {
6170
if (!done) {
6271
done = true;
63-
child.onNext(true);
64-
child.onCompleted();
72+
producer.setValue(true);
6573
}
6674
}
6775
};
6876
child.add(s);
77+
child.setProducer(producer);
6978
return s;
7079
}
7180
}

src/test/java/rx/internal/operators/OperatorAllTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,14 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.util.Arrays;
22+
import java.util.List;
2223
import java.util.concurrent.TimeUnit;
2324

2425
import org.junit.Test;
2526

2627
import rx.*;
2728
import rx.functions.Func1;
29+
import rx.observers.TestSubscriber;
2830

2931
public class OperatorAllTest {
3032

@@ -128,4 +130,52 @@ public Observable<Integer> call(Boolean t1) {
128130
});
129131
assertEquals((Object)2, source.toBlocking().first());
130132
}
133+
134+
@Test
135+
public void testBackpressureIfNoneRequestedNoneShouldBeDelivered() {
136+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(0);
137+
Observable.empty().all(new Func1<Object, Boolean>() {
138+
@Override
139+
public Boolean call(Object t1) {
140+
return false;
141+
}
142+
}).subscribe(ts);
143+
ts.assertNoValues();
144+
ts.assertNoErrors();
145+
ts.assertNotCompleted();
146+
}
147+
148+
@Test
149+
public void testBackpressureIfOneRequestedOneShouldBeDelivered() {
150+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(1);
151+
Observable.empty().all(new Func1<Object, Boolean>() {
152+
@Override
153+
public Boolean call(Object object) {
154+
return false;
155+
}
156+
}).subscribe(ts);
157+
ts.assertTerminalEvent();
158+
ts.assertNoErrors();
159+
ts.assertCompleted();
160+
ts.assertValue(true);
161+
}
162+
163+
@Test
164+
public void testPredicateThrowsExceptionAndValueInCauseMessage() {
165+
TestSubscriber<Boolean> ts = new TestSubscriber<Boolean>(0);
166+
final IllegalArgumentException ex = new IllegalArgumentException();
167+
Observable.just("Boo!").all(new Func1<Object, Boolean>() {
168+
@Override
169+
public Boolean call(Object object) {
170+
throw ex;
171+
}
172+
}).subscribe(ts);
173+
ts.assertTerminalEvent();
174+
ts.assertNoValues();
175+
ts.assertNotCompleted();
176+
List<Throwable> errors = ts.getOnErrorEvents();
177+
assertEquals(1, errors.size());
178+
assertEquals(ex, errors.get(0));
179+
assertTrue(ex.getCause().getMessage().contains("Boo!"));
180+
}
131181
}

0 commit comments

Comments
 (0)