Skip to content

Commit 93abe84

Browse files
committed
2.x: perf: comparison of range and flatMap between Observable,
NbpObservable and Single.
1 parent ba06df5 commit 93abe84

File tree

3 files changed

+182
-0
lines changed

3 files changed

+182
-0
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.*;
17+
18+
import org.openjdk.jmh.annotations.*;
19+
import org.openjdk.jmh.infra.Blackhole;
20+
21+
@BenchmarkMode(Mode.Throughput)
22+
@Warmup(iterations = 5)
23+
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
24+
@OutputTimeUnit(TimeUnit.SECONDS)
25+
@Fork(value = 1)
26+
@State(Scope.Thread)
27+
public class EachTypeFlatMapPerf {
28+
@Param({ "1", "1000", "1000000" })
29+
public int times;
30+
31+
Observable<Integer> bpRange;
32+
NbpObservable<Integer> nbpRange;
33+
Single<Integer> singleJust;
34+
35+
Observable<Integer> bpRangeMapJust;
36+
NbpObservable<Integer> nbpRangeMapJust;
37+
Single<Integer> singleJustMapJust;
38+
39+
Observable<Integer> bpRangeMapRange;
40+
NbpObservable<Integer> nbpRangeMapRange;
41+
42+
@Setup
43+
public void setup() {
44+
bpRange = Observable.range(1, times);
45+
nbpRange = NbpObservable.range(1, times);
46+
47+
bpRangeMapJust = bpRange.flatMap(Observable::just);
48+
nbpRangeMapJust = nbpRange.flatMap(NbpObservable::just);
49+
50+
bpRangeMapRange = bpRange.flatMap(v -> Observable.range(v, 2));
51+
nbpRangeMapRange = nbpRange.flatMap(v -> NbpObservable.range(v, 2));
52+
53+
singleJust = Single.just(1);
54+
singleJustMapJust = singleJust.flatMap(Single::just);
55+
}
56+
57+
@Benchmark
58+
public void bpRange(Blackhole bh) {
59+
bpRange.subscribe(new LatchedObserver<>(bh));
60+
}
61+
@Benchmark
62+
public void bpRangeMapJust(Blackhole bh) {
63+
bpRangeMapJust.subscribe(new LatchedObserver<>(bh));
64+
}
65+
@Benchmark
66+
public void bpRangeMapRange(Blackhole bh) {
67+
bpRangeMapRange.subscribe(new LatchedObserver<>(bh));
68+
}
69+
70+
@Benchmark
71+
public void nbpRange(Blackhole bh) {
72+
nbpRange.subscribe(new LatchedNbpObserver<>(bh));
73+
}
74+
@Benchmark
75+
public void nbpRangeMapJust(Blackhole bh) {
76+
nbpRangeMapJust.subscribe(new LatchedNbpObserver<>(bh));
77+
}
78+
@Benchmark
79+
public void nbpRangeMapRange(Blackhole bh) {
80+
nbpRangeMapRange.subscribe(new LatchedNbpObserver<>(bh));
81+
}
82+
83+
@Benchmark
84+
public void singleJust(Blackhole bh) {
85+
singleJust.subscribe(new LatchedSingleObserver<>(bh));
86+
}
87+
@Benchmark
88+
public void singleJustMapJust(Blackhole bh) {
89+
singleJustMapJust.subscribe(new LatchedSingleObserver<>(bh));
90+
}
91+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.CountDownLatch;
17+
18+
import org.openjdk.jmh.infra.Blackhole;
19+
20+
import io.reactivex.NbpObservable.NbpSubscriber;
21+
import io.reactivex.disposables.Disposable;
22+
23+
public final class LatchedNbpObserver<T> implements NbpSubscriber<T> {
24+
final CountDownLatch cdl;
25+
final Blackhole bh;
26+
public LatchedNbpObserver(Blackhole bh) {
27+
this.bh = bh;
28+
this.cdl = new CountDownLatch(1);
29+
}
30+
@Override
31+
public void onSubscribe(Disposable d) {
32+
33+
}
34+
@Override
35+
public void onNext(T value) {
36+
bh.consume(value);
37+
}
38+
@Override
39+
public void onError(Throwable e) {
40+
e.printStackTrace();
41+
cdl.countDown();
42+
}
43+
@Override
44+
public void onComplete() {
45+
cdl.countDown();
46+
}
47+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.CountDownLatch;
17+
18+
import org.openjdk.jmh.infra.Blackhole;
19+
20+
import io.reactivex.Single.SingleCallback;
21+
import io.reactivex.disposables.Disposable;
22+
23+
public final class LatchedSingleObserver<T> implements SingleCallback<T> {
24+
final CountDownLatch cdl;
25+
final Blackhole bh;
26+
public LatchedSingleObserver(Blackhole bh) {
27+
this.bh = bh;
28+
this.cdl = new CountDownLatch(1);
29+
}
30+
@Override
31+
public void onSubscribe(Disposable d) {
32+
33+
}
34+
@Override
35+
public void onSuccess(T value) {
36+
bh.consume(value);
37+
cdl.countDown();
38+
}
39+
@Override
40+
public void onFailure(Throwable e) {
41+
e.printStackTrace();
42+
cdl.countDown();
43+
}
44+
}

0 commit comments

Comments
 (0)