Skip to content

Commit 915e2ac

Browse files
committed
Merge pull request #2991 from davidmoten/take-until-predicate-last-cause
takeUntil(predicate) - include last value in error cause
2 parents da588a2 + 0fbc407 commit 915e2ac

File tree

2 files changed

+25
-4
lines changed

2 files changed

+25
-4
lines changed

src/main/java/rx/internal/operators/OperatorTakeUntilPredicate.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import rx.Observable.Operator;
1919
import rx.*;
2020
import rx.annotations.Experimental;
21+
import rx.exceptions.Exceptions;
22+
import rx.exceptions.OnErrorThrowable;
2123
import rx.functions.Func1;
2224

2325
/**
@@ -37,15 +39,16 @@ private ParentSubscriber(Subscriber<? super T> child) {
3739
}
3840

3941
@Override
40-
public void onNext(T args) {
41-
child.onNext(args);
42+
public void onNext(T t) {
43+
child.onNext(t);
4244

4345
boolean stop = false;
4446
try {
45-
stop = stopPredicate.call(args);
47+
stop = stopPredicate.call(t);
4648
} catch (Throwable e) {
4749
done = true;
48-
child.onError(e);
50+
Exceptions.throwIfFatal(e);
51+
child.onError(OnErrorThrowable.addValueAsLastCause(e, t));
4952
unsubscribe();
5053
return;
5154
}

src/test/java/rx/internal/operators/OperatorTakeUntilPredicateTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package rx.internal.operators;
1818

19+
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
1921
import static org.mockito.Matchers.any;
2022
import static org.mockito.Mockito.*;
2123

@@ -132,4 +134,20 @@ public void onStart() {
132134
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5));
133135
Assert.assertEquals(0, ts.getOnCompletedEvents().size());
134136
}
137+
138+
@Test
139+
public void testErrorIncludesLastValueAsCause() {
140+
TestSubscriber<String> ts = new TestSubscriber<String>();
141+
final TestException e = new TestException("Forced failure");
142+
Observable.just("abc").takeUntil(new Func1<String, Boolean>() {
143+
@Override
144+
public Boolean call(String t) {
145+
throw e;
146+
}
147+
}).subscribe(ts);
148+
ts.assertTerminalEvent();
149+
ts.assertNotCompleted();
150+
assertEquals(1, (int) ts.getOnErrorEvents().size());
151+
assertTrue(ts.getOnErrorEvents().get(0).getCause().getMessage().contains("abc"));
152+
}
135153
}

0 commit comments

Comments
 (0)