Skip to content

Commit 52a33fd

Browse files
Merge pull request ReactiveX#453 from zsxwing/issue-417
Fixed issue ReactiveX#417
2 parents a29a386 + 8fe4367 commit 52a33fd

File tree

1 file changed

+48
-12
lines changed

1 file changed

+48
-12
lines changed

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.inOrder;
21+
import static org.mockito.Mockito.never;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
2124

2225
import java.util.HashMap;
2326
import java.util.Map;
27+
import java.util.concurrent.CountDownLatch;
2428
import java.util.concurrent.atomic.AtomicInteger;
2529

2630
import org.junit.Before;
@@ -33,6 +37,7 @@
3337
import rx.Observable.OnSubscribeFunc;
3438
import rx.Observer;
3539
import rx.Subscription;
40+
import rx.concurrency.Schedulers;
3641
import rx.util.functions.Func1;
3742
import rx.util.functions.Func2;
3843

@@ -59,17 +64,12 @@ public final class OperationMap {
5964
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
6065
*/
6166
public static <T, R> OnSubscribeFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
62-
return new OnSubscribeFunc<R>() {
63-
@Override
64-
public Subscription onSubscribe(Observer<? super R> observer) {
65-
return new MapObservable<T, R>(sequence, new Func2<T, Integer, R>() {
67+
return mapWithIndex(sequence, new Func2<T, Integer, R>() {
6668
@Override
6769
public R call(T value, @SuppressWarnings("unused") Integer unused) {
6870
return func.call(value);
6971
}
70-
}).onSubscribe(observer);
71-
}
72-
};
72+
});
7373
}
7474

7575
/**
@@ -136,7 +136,8 @@ public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer,
136136

137137
@Override
138138
public Subscription onSubscribe(final Observer<? super R> observer) {
139-
return sequence.subscribe(new Observer<T>() {
139+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
140+
return subscription.wrap(sequence.subscribe(new SafeObserver<T>(subscription, new Observer<T>() {
140141
@Override
141142
public void onNext(T value) {
142143
observer.onNext(func.call(value, index));
@@ -152,7 +153,7 @@ public void onError(Throwable ex) {
152153
public void onCompleted() {
153154
observer.onCompleted();
154155
}
155-
});
156+
})));
156157
}
157158
}
158159

@@ -366,6 +367,41 @@ public String call(String s) {
366367
assertEquals(1, c2.get());
367368
}
368369

370+
@Test(expected = IllegalArgumentException.class)
371+
public void testMapWithIssue417() {
372+
Observable.from(1).observeOn(Schedulers.threadPoolForComputation())
373+
.map(new Func1<Integer, Integer>() {
374+
public Integer call(Integer arg0) {
375+
throw new IllegalArgumentException("any error");
376+
}
377+
}).toBlockingObservable().single();
378+
}
379+
380+
@Test
381+
public void testMapWithErrorInFuncAndThreadPoolScheduler() throws InterruptedException {
382+
// The error will throw in one of threads in the thread pool.
383+
// If map does not handle it, the error will disappear.
384+
// so map needs to handle the error by itself.
385+
final CountDownLatch latch = new CountDownLatch(1);
386+
Observable<String> m = Observable.from("one")
387+
.observeOn(Schedulers.threadPoolForComputation())
388+
.map(new Func1<String, String>() {
389+
public String call(String arg0) {
390+
try {
391+
throw new IllegalArgumentException("any error");
392+
} finally {
393+
latch.countDown();
394+
}
395+
}
396+
});
397+
398+
m.subscribe(stringObserver);
399+
latch.await();
400+
InOrder inorder = inOrder(stringObserver);
401+
inorder.verify(stringObserver, times(1)).onError(any(IllegalArgumentException.class));
402+
inorder.verifyNoMoreInteractions();
403+
}
404+
369405
private static Map<String, String> getMap(String prefix) {
370406
Map<String, String> m = new HashMap<String, String>();
371407
m.put("firstName", prefix + "First");

0 commit comments

Comments
 (0)