Skip to content

Commit 2abeadb

Browse files
authored
2.x: fix Flowable.flatMapMaybe/Single maxConcurrency not requesting more (#5287)
1 parent 434d1f4 commit 2abeadb

File tree

4 files changed

+60
-0
lines changed

4 files changed

+60
-0
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ void innerError(InnerObserver inner, Throwable e) {
230230
if (!delayErrors) {
231231
s.cancel();
232232
set.dispose();
233+
} else {
234+
if (maxConcurrency != Integer.MAX_VALUE) {
235+
s.request(1);
236+
}
233237
}
234238
active.decrementAndGet();
235239
drain();
@@ -254,12 +258,19 @@ void innerComplete(InnerObserver inner) {
254258
}
255259
return;
256260
}
261+
262+
if (maxConcurrency != Integer.MAX_VALUE) {
263+
s.request(1);
264+
}
257265
if (decrementAndGet() == 0) {
258266
return;
259267
}
260268
drainLoop();
261269
} else {
262270
active.decrementAndGet();
271+
if (maxConcurrency != Integer.MAX_VALUE) {
272+
s.request(1);
273+
}
263274
drain();
264275
}
265276
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ void innerError(InnerObserver inner, Throwable e) {
230230
if (!delayErrors) {
231231
s.cancel();
232232
set.dispose();
233+
} else {
234+
if (maxConcurrency != Integer.MAX_VALUE) {
235+
s.request(1);
236+
}
233237
}
234238
active.decrementAndGet();
235239
drain();

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybeTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,36 @@ public MaybeSource<Integer> apply(Integer v) throws Exception {
320320
.assertResult();
321321
}
322322

323+
@Test
324+
public void asyncFlattenNoneMaxConcurrency() {
325+
Flowable.range(1, 1000)
326+
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
327+
@Override
328+
public MaybeSource<Integer> apply(Integer v) throws Exception {
329+
return Maybe.<Integer>empty().subscribeOn(Schedulers.computation());
330+
}
331+
}, false, 128)
332+
.take(500)
333+
.test()
334+
.awaitDone(5, TimeUnit.SECONDS)
335+
.assertResult();
336+
}
337+
338+
@Test
339+
public void asyncFlattenErrorMaxConcurrency() {
340+
Flowable.range(1, 1000)
341+
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
342+
@Override
343+
public MaybeSource<Integer> apply(Integer v) throws Exception {
344+
return Maybe.<Integer>error(new TestException()).subscribeOn(Schedulers.computation());
345+
}
346+
}, true, 128)
347+
.take(500)
348+
.test()
349+
.awaitDone(5, TimeUnit.SECONDS)
350+
.assertFailure(CompositeException.class);
351+
}
352+
323353
@Test
324354
public void successError() {
325355
final PublishProcessor<Integer> ps = PublishProcessor.create();

src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingleTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,4 +490,19 @@ public void run() {
490490
TestHelper.race(r1, r2);
491491
}
492492
}
493+
494+
@Test
495+
public void asyncFlattenErrorMaxConcurrency() {
496+
Flowable.range(1, 1000)
497+
.flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
498+
@Override
499+
public MaybeSource<Integer> apply(Integer v) throws Exception {
500+
return Maybe.<Integer>error(new TestException()).subscribeOn(Schedulers.computation());
501+
}
502+
}, true, 128)
503+
.take(500)
504+
.test()
505+
.awaitDone(5, TimeUnit.SECONDS)
506+
.assertFailure(CompositeException.class);
507+
}
493508
}

0 commit comments

Comments
 (0)