Skip to content

Commit 9bc355f

Browse files
committed
Merge pull request #3377 from akarnokd/TakeReentrancyFix2x
2.x: fix take() reentrancy problem.
2 parents 98ca7ca + 32f9366 commit 9bc355f

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

src/main/java/io/reactivex/internal/operators/OperatorTake.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,10 @@ public void onSubscribe(Subscription s) {
6262
}
6363
@Override
6464
public void onNext(T t) {
65-
if (!done) {
65+
if (!done && remaining-- > 0) {
66+
boolean stop = remaining == 0;
6667
actual.onNext(t);
67-
if (--remaining == 0L) {
68+
if (stop) {
6869
onComplete();
6970
}
7071
}

src/test/java/io/reactivex/internal/operators/OperatorTakeTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.reactivex.exceptions.TestException;
3131
import io.reactivex.internal.subscriptions.*;
3232
import io.reactivex.schedulers.Schedulers;
33+
import io.reactivex.subjects.PublishSubject;
3334
import io.reactivex.subscribers.TestSubscriber;
3435

3536
public class OperatorTakeTest {
@@ -420,4 +421,19 @@ public void onNext(Integer t) {
420421
ts.assertError(TestException.class);
421422
ts.assertNotComplete();
422423
}
424+
425+
@Test
426+
public void testReentrantTake() {
427+
PublishSubject<Integer> source = PublishSubject.create();
428+
429+
TestSubscriber<Integer> ts = new TestSubscriber<>();
430+
431+
source.take(1).doOnNext(v -> source.onNext(2)).subscribe(ts);
432+
433+
source.onNext(1);
434+
435+
ts.assertValue(1);
436+
ts.assertNoErrors();
437+
ts.assertComplete();
438+
}
423439
}

src/test/java/io/reactivex/internal/operators/nbp/NbpOperatorTakeTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.reactivex.exceptions.TestException;
3232
import io.reactivex.internal.disposables.EmptyDisposable;
3333
import io.reactivex.schedulers.Schedulers;
34+
import io.reactivex.subjects.nbp.NbpPublishSubject;
3435
import io.reactivex.subscribers.nbp.NbpTestSubscriber;
3536

3637
public class NbpOperatorTakeTest {
@@ -338,4 +339,19 @@ public void onNext(Integer t) {
338339
ts.assertError(TestException.class);
339340
ts.assertNotComplete();
340341
}
342+
343+
@Test
344+
public void testReentrantTake() {
345+
NbpPublishSubject<Integer> source = NbpPublishSubject.create();
346+
347+
NbpTestSubscriber<Integer> ts = new NbpTestSubscriber<>();
348+
349+
source.take(1).doOnNext(v -> source.onNext(2)).subscribe(ts);
350+
351+
source.onNext(1);
352+
353+
ts.assertValue(1);
354+
ts.assertNoErrors();
355+
ts.assertComplete();
356+
}
341357
}

0 commit comments

Comments
 (0)