Skip to content

Commit 3d6403b

Browse files
authored
2.x: Fix Flowable.concatMap backpressure w/ scalars (#7091)
* 2.x: Fix Flowable.concatMap backpressure w/ scalars * Replace Java 8 constructs
1 parent b13a45e commit 3d6403b

File tree

2 files changed

+74
-10
lines changed

2 files changed

+74
-10
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
package io.reactivex.internal.operators.flowable;
1414

1515
import java.util.concurrent.Callable;
16-
import java.util.concurrent.atomic.AtomicInteger;
16+
import java.util.concurrent.atomic.*;
1717

1818
import org.reactivestreams.*;
1919

@@ -332,7 +332,7 @@ void drain() {
332332
continue;
333333
} else {
334334
active = true;
335-
inner.setSubscription(new WeakScalarSubscription<R>(vr, inner));
335+
inner.setSubscription(new SimpleScalarSubscription<R>(vr, inner));
336336
}
337337

338338
} else {
@@ -349,20 +349,20 @@ void drain() {
349349
}
350350
}
351351

352-
static final class WeakScalarSubscription<T> implements Subscription {
352+
static final class SimpleScalarSubscription<T>
353+
extends AtomicBoolean
354+
implements Subscription {
353355
final Subscriber<? super T> downstream;
354356
final T value;
355-
boolean once;
356357

357-
WeakScalarSubscription(T value, Subscriber<? super T> downstream) {
358+
SimpleScalarSubscription(T value, Subscriber<? super T> downstream) {
358359
this.value = value;
359360
this.downstream = downstream;
360361
}
361362

362363
@Override
363364
public void request(long n) {
364-
if (n > 0 && !once) {
365-
once = true;
365+
if (n > 0 && compareAndSet(false, true)) {
366366
Subscriber<? super T> a = downstream;
367367
a.onNext(value);
368368
a.onComplete();
@@ -538,7 +538,7 @@ void drain() {
538538
continue;
539539
} else {
540540
active = true;
541-
inner.setSubscription(new WeakScalarSubscription<R>(vr, inner));
541+
inner.setSubscription(new SimpleScalarSubscription<R>(vr, inner));
542542
}
543543
} else {
544544
active = true;

src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapTest.java

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@
2424
import io.reactivex.*;
2525
import io.reactivex.exceptions.*;
2626
import io.reactivex.functions.*;
27-
import io.reactivex.internal.operators.flowable.FlowableConcatMap.WeakScalarSubscription;
27+
import io.reactivex.internal.functions.Functions;
28+
import io.reactivex.internal.operators.flowable.FlowableConcatMap.SimpleScalarSubscription;
29+
import io.reactivex.processors.PublishProcessor;
2830
import io.reactivex.schedulers.Schedulers;
2931
import io.reactivex.subscribers.TestSubscriber;
3032

@@ -33,7 +35,7 @@ public class FlowableConcatMapTest {
3335
@Test
3436
public void weakSubscriptionRequest() {
3537
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
36-
WeakScalarSubscription<Integer> ws = new WeakScalarSubscription<Integer>(1, ts);
38+
SimpleScalarSubscription<Integer> ws = new SimpleScalarSubscription<Integer>(1, ts);
3739
ts.onSubscribe(ws);
3840

3941
ws.request(0);
@@ -105,6 +107,68 @@ public Publisher<? extends Object> apply(String v)
105107
.assertResult("RxSingleScheduler");
106108
}
107109

110+
@Test
111+
public void innerScalarRequestRace() {
112+
final Flowable<Integer> just = Flowable.just(1);
113+
final int n = 1000;
114+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
115+
final PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
116+
117+
final TestSubscriber<Integer> ts = source
118+
.concatMap(Functions.<Flowable<Integer>>identity(), n + 1)
119+
.test(1L);
120+
121+
TestHelper.race(new Runnable() {
122+
@Override
123+
public void run() {
124+
for (int j = 0; j < n; j++) {
125+
source.onNext(just);
126+
}
127+
}
128+
}, new Runnable() {
129+
@Override
130+
public void run() {
131+
for (int j = 0; j < n; j++) {
132+
ts.request(1);
133+
}
134+
}
135+
});
136+
137+
ts.assertValueCount(n);
138+
}
139+
}
140+
141+
@Test
142+
public void innerScalarRequestRaceDelayError() {
143+
final Flowable<Integer> just = Flowable.just(1);
144+
final int n = 1000;
145+
for (int i = 0; i < TestHelper.RACE_DEFAULT_LOOPS; i++) {
146+
final PublishProcessor<Flowable<Integer>> source = PublishProcessor.create();
147+
148+
final TestSubscriber<Integer> ts = source
149+
.concatMapDelayError(Functions.<Flowable<Integer>>identity(), n + 1, true)
150+
.test(1L);
151+
152+
TestHelper.race(new Runnable() {
153+
@Override
154+
public void run() {
155+
for (int j = 0; j < n; j++) {
156+
source.onNext(just);
157+
}
158+
}
159+
}, new Runnable() {
160+
@Override
161+
public void run() {
162+
for (int j = 0; j < n; j++) {
163+
ts.request(1);
164+
}
165+
}
166+
});
167+
168+
ts.assertValueCount(n);
169+
}
170+
}
171+
108172
@Test
109173
public void pollThrows() {
110174
Flowable.just(1)

0 commit comments

Comments
 (0)