File tree Expand file tree Collapse file tree 2 files changed +25
-4
lines changed
main/java/rx/internal/operators
test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +25
-4
lines changed Original file line number Diff line number Diff line change 17
17
18
18
import rx .Observable .Operator ;
19
19
import rx .Subscriber ;
20
+ import rx .exceptions .Exceptions ;
21
+ import rx .exceptions .OnErrorThrowable ;
20
22
import rx .functions .Func1 ;
21
23
import rx .functions .Func2 ;
22
24
@@ -52,18 +54,19 @@ public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
52
54
private boolean done = false ;
53
55
54
56
@ Override
55
- public void onNext (T args ) {
57
+ public void onNext (T t ) {
56
58
boolean isSelected ;
57
59
try {
58
- isSelected = predicate .call (args , counter ++);
60
+ isSelected = predicate .call (t , counter ++);
59
61
} catch (Throwable e ) {
60
62
done = true ;
61
- subscriber .onError (e );
63
+ Exceptions .throwIfFatal (e );
64
+ subscriber .onError (OnErrorThrowable .addValueAsLastCause (e , t ));
62
65
unsubscribe ();
63
66
return ;
64
67
}
65
68
if (isSelected ) {
66
- subscriber .onNext (args );
69
+ subscriber .onNext (t );
67
70
} else {
68
71
done = true ;
69
72
subscriber .onCompleted ();
Original file line number Diff line number Diff line change 15
15
*/
16
16
package rx .internal .operators ;
17
17
18
+ import static org .junit .Assert .assertTrue ;
18
19
import static org .junit .Assert .fail ;
19
20
import static org .mockito .Matchers .any ;
20
21
import static org .mockito .Mockito .*;
25
26
26
27
import rx .*;
27
28
import rx .Observable .OnSubscribe ;
29
+ import rx .exceptions .TestException ;
28
30
import rx .functions .Func1 ;
29
31
import rx .observers .TestSubscriber ;
30
32
import rx .subjects .*;
@@ -261,4 +263,20 @@ public Boolean call(Integer t1) {
261
263
262
264
Assert .assertFalse ("Unsubscribed!" , ts .isUnsubscribed ());
263
265
}
266
+
267
+ @ Test
268
+ public void testErrorCauseIncludesLastValue () {
269
+ TestSubscriber <String > ts = new TestSubscriber <String >();
270
+ Observable .just ("abc" ).takeWhile (new Func1 <String , Boolean >() {
271
+ @ Override
272
+ public Boolean call (String t1 ) {
273
+ throw new TestException ();
274
+ }
275
+ }).subscribe (ts );
276
+
277
+ ts .assertTerminalEvent ();
278
+ ts .assertNoValues ();
279
+ assertTrue (ts .getOnErrorEvents ().get (0 ).getCause ().getMessage ().contains ("abc" ));
280
+ }
281
+
264
282
}
You can’t perform that action at this time.
0 commit comments