Skip to content

Commit 6aee6f8

Browse files
ADI1133akarnokd
authored andcommitted
1.x: Add maxConcurrent parameter to concatMapEager
* Add maxConcurrent parameter to concatMapEager * Improve ConcatMapEager performance and add unit test * Add test case and cleanup whitespace
1 parent c8e1b03 commit 6aee6f8

File tree

3 files changed

+74
-8
lines changed

3 files changed

+74
-8
lines changed

src/main/java/rx/Observable.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5353,7 +5353,41 @@ public final <R> Observable<R> concatMapEager(Func1<? super T, ? extends Observa
53535353
if (capacityHint < 1) {
53545354
throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint);
53555355
}
5356-
return lift(new OperatorEagerConcatMap<T, R>(mapper, capacityHint));
5356+
return lift(new OperatorEagerConcatMap<T, R>(mapper, capacityHint, Integer.MAX_VALUE));
5357+
}
5358+
5359+
/**
5360+
* Maps a sequence of values into Observables and concatenates these Observables eagerly into a single
5361+
* Observable.
5362+
* <p>
5363+
* Eager concatenation means that once a subscriber subscribes, this operator subscribes to all of the
5364+
* source Observables. The operator buffers the values emitted by these Observables and then drains them in
5365+
* order, each one after the previous one completes.
5366+
* <dl>
5367+
* <dt><b>Backpressure:</b></dt>
5368+
* <dd>Backpressure is honored towards the downstream, however, due to the eagerness requirement, sources
5369+
* are subscribed to in unbounded mode and their values are queued up in an unbounded buffer.</dd>
5370+
* <dt><b>Scheduler:</b></dt>
5371+
* <dd>This method does not operate by default on a particular {@link Scheduler}.</dd>
5372+
* </dl>
5373+
* @param <R> the value type
5374+
* @param mapper the function that maps a sequence of values into a sequence of Observables that will be
5375+
* eagerly concatenated
5376+
* @param capacityHint hints about the number of expected source sequence values
5377+
* @param maxConcurrent the maximum number of concurrent subscribed observables
5378+
* @return
5379+
* @warn javadoc fails to describe the return value
5380+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
5381+
*/
5382+
@Experimental
5383+
public final <R> Observable<R> concatMapEager(Func1<? super T, ? extends Observable<? extends R>> mapper, int capacityHint, int maxConcurrent) {
5384+
if (capacityHint < 1) {
5385+
throw new IllegalArgumentException("capacityHint > 0 required but it was " + capacityHint);
5386+
}
5387+
if (maxConcurrent < 1) {
5388+
throw new IllegalArgumentException("maxConcurrent > 0 required but it was " + capacityHint);
5389+
}
5390+
return lift(new OperatorEagerConcatMap<T, R>(mapper, capacityHint, maxConcurrent));
53575391
}
53585392

53595393
/**

src/main/java/rx/internal/operators/OperatorEagerConcatMap.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,16 @@
3131
public final class OperatorEagerConcatMap<T, R> implements Operator<R, T> {
3232
final Func1<? super T, ? extends Observable<? extends R>> mapper;
3333
final int bufferSize;
34-
public OperatorEagerConcatMap(Func1<? super T, ? extends Observable<? extends R>> mapper, int bufferSize) {
34+
private final int maxConcurrent;
35+
public OperatorEagerConcatMap(Func1<? super T, ? extends Observable<? extends R>> mapper, int bufferSize, int maxConcurrent) {
3536
this.mapper = mapper;
3637
this.bufferSize = bufferSize;
38+
this.maxConcurrent = maxConcurrent;
3739
}
3840

3941
@Override
4042
public Subscriber<? super T> call(Subscriber<? super R> t) {
41-
EagerOuterSubscriber<T, R> outer = new EagerOuterSubscriber<T, R>(mapper, bufferSize, t);
43+
EagerOuterSubscriber<T, R> outer = new EagerOuterSubscriber<T, R>(mapper, bufferSize, maxConcurrent, t);
4244
outer.init();
4345
return outer;
4446
}
@@ -82,12 +84,13 @@ static final class EagerOuterSubscriber<T, R> extends Subscriber<T> {
8284
private EagerOuterProducer sharedProducer;
8385

8486
public EagerOuterSubscriber(Func1<? super T, ? extends Observable<? extends R>> mapper, int bufferSize,
85-
Subscriber<? super R> actual) {
87+
int maxConcurrent, Subscriber<? super R> actual) {
8688
this.mapper = mapper;
8789
this.bufferSize = bufferSize;
8890
this.actual = actual;
8991
this.subscribers = new LinkedList<EagerInnerSubscriber<R>>();
9092
this.wip = new AtomicInteger();
93+
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
9194
}
9295

9396
void init() {
@@ -223,6 +226,7 @@ void drain() {
223226
}
224227
innerSubscriber.unsubscribe();
225228
innerDone = true;
229+
request(1);
226230
break;
227231
}
228232
}

src/test/java/rx/internal/operators/OperatorEagerConcatMapTest.java

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package rx.internal.operators;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.concurrent.atomic.*;
2022

2123
import org.junit.*;
@@ -302,11 +304,16 @@ public Observable<Integer> call(Integer t) {
302304
ts.assertNotCompleted();
303305
ts.assertError(TestException.class);
304306
}
305-
307+
306308
@Test(expected = IllegalArgumentException.class)
307309
public void testInvalidCapacityHint() {
308310
Observable.just(1).concatMapEager(toJust, 0);
309311
}
312+
313+
@Test(expected = IllegalArgumentException.class)
314+
public void testInvalidMaxConcurrent() {
315+
Observable.just(1).concatMapEager(toJust, RxRingBuffer.SIZE, 0);
316+
}
310317

311318
@Test
312319
public void testBackpressure() {
@@ -397,17 +404,38 @@ public void call(Integer t) {
397404

398405
@Test
399406
public void testInnerNull() {
400-
TestSubscriber<Object> ts = TestSubscriber.create();
401-
402407
Observable.just(1).concatMapEager(new Func1<Integer, Observable<Integer>>() {
403408
@Override
404409
public Observable<Integer> call(Integer t) {
405410
return Observable.just(null);
406411
}
407412
}).subscribe(ts);
408-
413+
409414
ts.assertNoErrors();
410415
ts.assertCompleted();
411416
ts.assertValue(null);
412417
}
418+
419+
420+
@Test
421+
public void testMaxConcurrent5() {
422+
final List<Long> requests = new ArrayList<Long>();
423+
Observable.range(1, 100).doOnRequest(new Action1<Long>() {
424+
@Override
425+
public void call(Long reqCount) {
426+
requests.add(reqCount);
427+
}
428+
}).concatMapEager(toJust, RxRingBuffer.SIZE, 5).subscribe(ts);
429+
430+
ts.assertNoErrors();
431+
ts.assertValueCount(100);
432+
ts.assertCompleted();
433+
434+
Assert.assertEquals(5, (long) requests.get(0));
435+
Assert.assertEquals(1, (long) requests.get(1));
436+
Assert.assertEquals(1, (long) requests.get(2));
437+
Assert.assertEquals(1, (long) requests.get(3));
438+
Assert.assertEquals(1, (long) requests.get(4));
439+
Assert.assertEquals(1, (long) requests.get(5));
440+
}
413441
}

0 commit comments

Comments
 (0)