Skip to content

Commit 14aeb00

Browse files
Merge pull request #1832 from benjchristensen/thread-interrupts-take
Fix Take Early Unsubscription Causing Interrupts
2 parents c6c45f0 + af973b8 commit 14aeb00

File tree

2 files changed

+35
-12
lines changed

2 files changed

+35
-12
lines changed

src/main/java/rx/internal/operators/OperatorTake.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,11 @@ public void onNext(T i) {
6363
if (!isUnsubscribed()) {
6464
if (++count >= limit) {
6565
completed = true;
66-
// unsubscribe before emitting onNext so shutdown happens before possible effects
67-
// of onNext such as product.request(n) calls be sent upstream.
68-
unsubscribe();
6966
}
7067
child.onNext(i);
7168
if (completed) {
7269
child.onCompleted();
70+
unsubscribe();
7371
}
7472
}
7573
}
@@ -83,11 +81,13 @@ public void setProducer(final Producer producer) {
8381

8482
@Override
8583
public void request(long n) {
86-
long c = limit - count;
87-
if (n < c) {
88-
producer.request(n);
89-
} else {
90-
producer.request(c);
84+
if (!completed) {
85+
long c = limit - count;
86+
if (n < c) {
87+
producer.request(n);
88+
} else {
89+
producer.request(c);
90+
}
9191
}
9292
}
9393
});

src/test/java/rx/internal/operators/OperatorTakeTest.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertTrue;
20-
import static org.junit.Assert.fail;
18+
import static org.junit.Assert.*;
2119
import static org.mockito.Matchers.any;
2220
import static org.mockito.Matchers.anyString;
2321
import static org.mockito.Mockito.inOrder;
@@ -28,9 +26,11 @@
2826
import static org.mockito.Mockito.verifyNoMoreInteractions;
2927

3028
import java.util.Arrays;
29+
import java.util.concurrent.CountDownLatch;
3130
import java.util.concurrent.atomic.AtomicBoolean;
3231
import java.util.concurrent.atomic.AtomicInteger;
3332
import java.util.concurrent.atomic.AtomicLong;
33+
import java.util.concurrent.atomic.AtomicReference;
3434

3535
import org.junit.Test;
3636
import org.mockito.InOrder;
@@ -43,7 +43,6 @@
4343
import rx.Subscription;
4444
import rx.functions.Action1;
4545
import rx.functions.Func1;
46-
import rx.internal.operators.OperatorTake;
4746
import rx.observers.Subscribers;
4847
import rx.observers.TestSubscriber;
4948
import rx.schedulers.Schedulers;
@@ -365,4 +364,28 @@ public void request(long n) {
365364
}).take(1).subscribe(ts);
366365
assertEquals(1, requested.get());
367366
}
367+
368+
@Test
369+
public void testInterrupt() throws InterruptedException {
370+
final AtomicReference<Object> exception = new AtomicReference<Object>();
371+
final CountDownLatch latch = new CountDownLatch(1);
372+
Observable.just(1).subscribeOn(Schedulers.computation()).take(1).subscribe(new Action1<Integer>() {
373+
374+
@Override
375+
public void call(Integer t1) {
376+
try {
377+
Thread.sleep(100);
378+
} catch (Exception e) {
379+
exception.set(e);
380+
e.printStackTrace();
381+
} finally {
382+
latch.countDown();
383+
}
384+
}
385+
386+
});
387+
388+
latch.await();
389+
assertNull(exception.get());
390+
}
368391
}

0 commit comments

Comments
 (0)