Skip to content

Commit 425a6f4

Browse files
committed
Merge pull request #2951 from davidmoten/concat-request-overflow
OperatorConcat - prevent request overflow and fix race condition
2 parents 1b25f07 + bad4d40 commit 425a6f4

File tree

2 files changed

+30
-1
lines changed

2 files changed

+30
-1
lines changed

src/main/java/rx/internal/operators/OperatorConcat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ public void onStart() {
115115
private void requestFromChild(long n) {
116116
// we track 'requested' so we know whether we should subscribe the next or not
117117
ConcatInnerSubscriber<T> actualSubscriber = currentSubscriber;
118-
if (REQUESTED_UPDATER.getAndAdd(this, n) == 0) {
118+
if (n > 0 && BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n) == 0) {
119119
if (actualSubscriber == null && wip > 0) {
120120
// this means we may be moving from one subscriber to another after having stopped processing
121121
// so need to kick off the subscribe via this request notification

src/test/java/rx/internal/operators/OperatorConcatTest.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertTrue;
1920
import static org.junit.Assert.fail;
2021
import static org.mockito.Matchers.any;
2122
import static org.mockito.Matchers.anyString;
@@ -35,7 +36,9 @@
3536
import org.mockito.InOrder;
3637

3738
import rx.Observable.OnSubscribe;
39+
import rx.Scheduler.Worker;
3840
import rx.*;
41+
import rx.functions.Action0;
3942
import rx.functions.Func1;
4043
import rx.internal.util.RxRingBuffer;
4144
import rx.observers.TestSubscriber;
@@ -766,4 +769,30 @@ public void onError(Throwable e) {
766769

767770
assertEquals(n, counter.get());
768771
}
772+
773+
@Test
774+
public void testRequestOverflowDoesNotStallStream() {
775+
Observable<Integer> o1 = Observable.just(1,2,3);
776+
Observable<Integer> o2 = Observable.just(4,5,6);
777+
final AtomicBoolean completed = new AtomicBoolean(false);
778+
o1.concatWith(o2).subscribe(new Subscriber<Integer>() {
779+
780+
@Override
781+
public void onCompleted() {
782+
completed.set(true);
783+
}
784+
785+
@Override
786+
public void onError(Throwable e) {
787+
788+
}
789+
790+
@Override
791+
public void onNext(Integer t) {
792+
request(2);
793+
}});
794+
795+
assertTrue(completed.get());
796+
}
797+
769798
}

0 commit comments

Comments
 (0)