Skip to content

Commit 07eefcd

Browse files
committed
Fix take swallowing exception if thrown by the exactly the nth onNext
call to it.
1 parent f036cd0 commit 07eefcd

File tree

2 files changed

+43
-32
lines changed

2 files changed

+43
-32
lines changed

src/main/java/rx/internal/operators/OperatorTake.java

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,33 +43,41 @@ public OperatorTake(int limit) {
4343
public Subscriber<? super T> call(final Subscriber<? super T> child) {
4444
final Subscriber<T> parent = new Subscriber<T>() {
4545

46-
int count = 0;
47-
boolean completed = false;
46+
int count;
47+
boolean completed;
4848

4949
@Override
5050
public void onCompleted() {
5151
if (!completed) {
52+
completed = true;
5253
child.onCompleted();
5354
}
5455
}
5556

5657
@Override
5758
public void onError(Throwable e) {
5859
if (!completed) {
59-
child.onError(e);
60+
completed = true;
61+
try {
62+
child.onError(e);
63+
} finally {
64+
unsubscribe();
65+
}
6066
}
6167
}
6268

6369
@Override
6470
public void onNext(T i) {
6571
if (!isUnsubscribed()) {
66-
if (++count >= limit) {
67-
completed = true;
68-
}
72+
boolean stop = ++count >= limit;
6973
child.onNext(i);
70-
if (completed) {
71-
child.onCompleted();
72-
unsubscribe();
74+
if (stop && !completed) {
75+
completed = true;
76+
try {
77+
child.onCompleted();
78+
} finally {
79+
unsubscribe();
80+
}
7381
}
7482
}
7583
}

src/test/java/rx/internal/operators/OperatorTakeTest.java

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,36 +16,21 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.any;
20-
import static org.mockito.Matchers.anyString;
21-
import static org.mockito.Mockito.inOrder;
22-
import static org.mockito.Mockito.mock;
23-
import static org.mockito.Mockito.never;
24-
import static org.mockito.Mockito.times;
25-
import static org.mockito.Mockito.verify;
26-
import static org.mockito.Mockito.verifyNoMoreInteractions;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
2721

2822
import java.util.Arrays;
29-
import java.util.concurrent.CountDownLatch;
30-
import java.util.concurrent.TimeUnit;
31-
import java.util.concurrent.atomic.AtomicBoolean;
32-
import java.util.concurrent.atomic.AtomicInteger;
33-
import java.util.concurrent.atomic.AtomicLong;
34-
import java.util.concurrent.atomic.AtomicReference;
23+
import java.util.concurrent.*;
24+
import java.util.concurrent.atomic.*;
3525

3626
import org.junit.Test;
3727
import org.mockito.InOrder;
3828

39-
import rx.Observable;
29+
import rx.*;
4030
import rx.Observable.OnSubscribe;
41-
import rx.Observer;
42-
import rx.Producer;
43-
import rx.Subscriber;
44-
import rx.Subscription;
45-
import rx.functions.Action1;
46-
import rx.functions.Func1;
47-
import rx.observers.Subscribers;
48-
import rx.observers.TestSubscriber;
31+
import rx.exceptions.TestException;
32+
import rx.functions.*;
33+
import rx.observers.*;
4934
import rx.schedulers.Schedulers;
5035

5136
public class OperatorTakeTest {
@@ -414,4 +399,22 @@ public void call(Long n) {
414399
ts.assertNoErrors();
415400
assertEquals(2,requests.get());
416401
}
402+
403+
@Test
404+
public void takeFinalValueThrows() {
405+
Observable<Integer> source = Observable.just(1).take(1);
406+
407+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
408+
@Override
409+
public void onNext(Integer t) {
410+
throw new TestException();
411+
}
412+
};
413+
414+
source.subscribe(ts);
415+
416+
ts.assertNoValues();
417+
ts.assertError(TestException.class);
418+
ts.assertNotCompleted();
419+
}
417420
}

0 commit comments

Comments
 (0)