15
15
*/
16
16
package rx .operators ;
17
17
18
- import static org .junit .Assert .*;
19
- import static org .mockito .Matchers .*;
20
- import static org .mockito .Mockito .*;
18
+ import static org .junit .Assert .assertEquals ;
19
+ import static org .mockito .Matchers .any ;
20
+ import static org .mockito .Mockito .inOrder ;
21
+ import static org .mockito .Mockito .never ;
22
+ import static org .mockito .Mockito .times ;
23
+ import static org .mockito .Mockito .verify ;
21
24
22
25
import java .util .HashMap ;
23
26
import java .util .Map ;
27
+ import java .util .concurrent .CountDownLatch ;
24
28
import java .util .concurrent .atomic .AtomicInteger ;
25
29
26
30
import org .junit .Before ;
33
37
import rx .Observable .OnSubscribeFunc ;
34
38
import rx .Observer ;
35
39
import rx .Subscription ;
40
+ import rx .concurrency .Schedulers ;
36
41
import rx .util .functions .Func1 ;
37
42
import rx .util .functions .Func2 ;
38
43
@@ -59,17 +64,12 @@ public final class OperationMap {
59
64
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
60
65
*/
61
66
public static <T , R > OnSubscribeFunc <R > map (final Observable <? extends T > sequence , final Func1 <? super T , ? extends R > func ) {
62
- return new OnSubscribeFunc <R >() {
63
- @ Override
64
- public Subscription onSubscribe (Observer <? super R > observer ) {
65
- return new MapObservable <T , R >(sequence , new Func2 <T , Integer , R >() {
67
+ return mapWithIndex (sequence , new Func2 <T , Integer , R >() {
66
68
@ Override
67
69
public R call (T value , @ SuppressWarnings ("unused" ) Integer unused ) {
68
70
return func .call (value );
69
71
}
70
- }).onSubscribe (observer );
71
- }
72
- };
72
+ });
73
73
}
74
74
75
75
/**
@@ -136,7 +136,8 @@ public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer,
136
136
137
137
@ Override
138
138
public Subscription onSubscribe (final Observer <? super R > observer ) {
139
- return sequence .subscribe (new Observer <T >() {
139
+ final SafeObservableSubscription subscription = new SafeObservableSubscription ();
140
+ return subscription .wrap (sequence .subscribe (new SafeObserver <T >(subscription , new Observer <T >() {
140
141
@ Override
141
142
public void onNext (T value ) {
142
143
observer .onNext (func .call (value , index ));
@@ -152,7 +153,7 @@ public void onError(Throwable ex) {
152
153
public void onCompleted () {
153
154
observer .onCompleted ();
154
155
}
155
- });
156
+ }))) ;
156
157
}
157
158
}
158
159
@@ -366,6 +367,41 @@ public String call(String s) {
366
367
assertEquals (1 , c2 .get ());
367
368
}
368
369
370
+ @ Test (expected = IllegalArgumentException .class )
371
+ public void testMapWithIssue417 () {
372
+ Observable .from (1 ).observeOn (Schedulers .threadPoolForComputation ())
373
+ .map (new Func1 <Integer , Integer >() {
374
+ public Integer call (Integer arg0 ) {
375
+ throw new IllegalArgumentException ("any error" );
376
+ }
377
+ }).toBlockingObservable ().single ();
378
+ }
379
+
380
+ @ Test
381
+ public void testMapWithErrorInFuncAndThreadPoolScheduler () throws InterruptedException {
382
+ // The error will throw in one of threads in the thread pool.
383
+ // If map does not handle it, the error will disappear.
384
+ // so map needs to handle the error by itself.
385
+ final CountDownLatch latch = new CountDownLatch (1 );
386
+ Observable <String > m = Observable .from ("one" )
387
+ .observeOn (Schedulers .threadPoolForComputation ())
388
+ .map (new Func1 <String , String >() {
389
+ public String call (String arg0 ) {
390
+ try {
391
+ throw new IllegalArgumentException ("any error" );
392
+ } finally {
393
+ latch .countDown ();
394
+ }
395
+ }
396
+ });
397
+
398
+ m .subscribe (stringObserver );
399
+ latch .await ();
400
+ InOrder inorder = inOrder (stringObserver );
401
+ inorder .verify (stringObserver , times (1 )).onError (any (IllegalArgumentException .class ));
402
+ inorder .verifyNoMoreInteractions ();
403
+ }
404
+
369
405
private static Map <String , String > getMap (String prefix ) {
370
406
Map <String , String > m = new HashMap <String , String >();
371
407
m .put ("firstName" , prefix + "First" );
0 commit comments