Skip to content

Commit 44947d9

Browse files
committed
1.x: benchmark just and Single (#3982)
* 1.x: benchmark just and Single * Adjust header date
1 parent 9710f06 commit 44947d9

File tree

3 files changed

+341
-0
lines changed

3 files changed

+341
-0
lines changed

src/perf/java/rx/OneItemPerf.java

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.openjdk.jmh.annotations.*;
22+
import org.openjdk.jmh.infra.Blackhole;
23+
24+
import rx.Observable.OnSubscribe;
25+
import rx.functions.Func1;
26+
import rx.internal.producers.SingleProducer;
27+
import rx.jmh.*;
28+
29+
/**
30+
* Benchmark operators working on a one-item source.
31+
* <p>
32+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OneItemPerf.*"
33+
* <p>
34+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*OneItemPerf.*"
35+
*/
36+
@BenchmarkMode(Mode.Throughput)
37+
@OutputTimeUnit(TimeUnit.SECONDS)
38+
@State(Scope.Thread)
39+
public class OneItemPerf {
40+
41+
Observable<Integer> scalar;
42+
Observable<Integer> scalarHidden;
43+
Observable<Integer> one;
44+
Single<Integer> single;
45+
Single<Integer> singleHidden;
46+
47+
Observable<Integer> scalarConcat;
48+
Observable<Integer> scalarHiddenConcat;
49+
Observable<Integer> oneConcat;
50+
51+
Observable<Integer> scalarMerge;
52+
Observable<Integer> scalarHiddenMerge;
53+
Observable<Integer> oneMerge;
54+
Single<Integer> singleMerge;
55+
Single<Integer> singleHiddenMerge;
56+
57+
Observable<Integer> scalarSwitch;
58+
Observable<Integer> scalarHiddenSwitch;
59+
Observable<Integer> oneSwitch;
60+
61+
<T> Single<T> hide(final Single<T> single) {
62+
return Single.create(new Single.OnSubscribe<T>() {
63+
@Override
64+
public void call(SingleSubscriber<? super T> t) {
65+
single.subscribe(t);
66+
}
67+
});
68+
}
69+
70+
@Setup
71+
public void setup() {
72+
scalar = Observable.just(1);
73+
one = Observable.create(new OnSubscribe<Integer>() {
74+
@Override
75+
public void call(Subscriber<? super Integer> t) {
76+
t.setProducer(new SingleProducer<Integer>(t, 1));
77+
}
78+
});
79+
single = Single.just(1);
80+
singleHidden = hide(single);
81+
scalarHidden = scalar.asObservable();
82+
83+
// ----------------------------------------------------------------------------
84+
85+
scalarConcat = scalar.concatMap(new Func1<Integer, Observable<Integer>>() {
86+
@Override
87+
public Observable<Integer> call(Integer v) {
88+
return scalar;
89+
}
90+
});
91+
scalarHiddenConcat = scalarHidden.concatMap(new Func1<Integer, Observable<Integer>>() {
92+
@Override
93+
public Observable<Integer> call(Integer v) {
94+
return scalar;
95+
}
96+
});
97+
98+
oneConcat = one.concatMap(new Func1<Integer, Observable<Integer>>() {
99+
@Override
100+
public Observable<Integer> call(Integer v) {
101+
return scalar;
102+
}
103+
});
104+
105+
// ----------------------------------------------------------------------------
106+
107+
scalarMerge = scalar.flatMap(new Func1<Integer, Observable<Integer>>() {
108+
@Override
109+
public Observable<Integer> call(Integer v) {
110+
return scalar;
111+
}
112+
});
113+
scalarHiddenMerge = scalarHidden.flatMap(new Func1<Integer, Observable<Integer>>() {
114+
@Override
115+
public Observable<Integer> call(Integer v) {
116+
return scalar;
117+
}
118+
});
119+
120+
oneMerge = one.flatMap(new Func1<Integer, Observable<Integer>>() {
121+
@Override
122+
public Observable<Integer> call(Integer v) {
123+
return scalar;
124+
}
125+
});
126+
singleMerge = single.flatMap(new Func1<Integer, Single<Integer>>() {
127+
@Override
128+
public Single<Integer> call(Integer v) {
129+
return single;
130+
}
131+
});
132+
singleHiddenMerge = hide(single).flatMap(new Func1<Integer, Single<Integer>>() {
133+
@Override
134+
public Single<Integer> call(Integer v) {
135+
return single;
136+
}
137+
});
138+
139+
// ----------------------------------------------------------------------------
140+
141+
scalarSwitch = scalar.switchMap(new Func1<Integer, Observable<Integer>>() {
142+
@Override
143+
public Observable<Integer> call(Integer v) {
144+
return scalar;
145+
}
146+
});
147+
scalarHiddenSwitch = scalarHidden.switchMap(new Func1<Integer, Observable<Integer>>() {
148+
@Override
149+
public Observable<Integer> call(Integer v) {
150+
return scalar;
151+
}
152+
});
153+
154+
oneSwitch = one.switchMap(new Func1<Integer, Observable<Integer>>() {
155+
@Override
156+
public Observable<Integer> call(Integer v) {
157+
return scalar;
158+
}
159+
});
160+
}
161+
162+
@Benchmark
163+
public void scalar(Blackhole bh) {
164+
scalar.subscribe(new LatchedObserver<Integer>(bh));
165+
}
166+
@Benchmark
167+
public void scalarHidden(Blackhole bh) {
168+
scalarHidden.subscribe(new LatchedObserver<Integer>(bh));
169+
}
170+
@Benchmark
171+
public void one(Blackhole bh) {
172+
one.subscribe(new LatchedObserver<Integer>(bh));
173+
}
174+
@Benchmark
175+
public void single(Blackhole bh) {
176+
single.subscribe(new PerfSingleSubscriber(bh));
177+
}
178+
@Benchmark
179+
public void singleHidden(Blackhole bh) {
180+
singleHidden.subscribe(new PerfSingleSubscriber(bh));
181+
}
182+
183+
@Benchmark
184+
public void scalarConcat(Blackhole bh) {
185+
scalarConcat.subscribe(new LatchedObserver<Integer>(bh));
186+
}
187+
@Benchmark
188+
public void scalarHiddenConcat(Blackhole bh) {
189+
scalarHidden.subscribe(new LatchedObserver<Integer>(bh));
190+
}
191+
@Benchmark
192+
public void oneConcat(Blackhole bh) {
193+
oneConcat.subscribe(new LatchedObserver<Integer>(bh));
194+
}
195+
196+
@Benchmark
197+
public void scalarMerge(Blackhole bh) {
198+
scalarMerge.subscribe(new LatchedObserver<Integer>(bh));
199+
}
200+
@Benchmark
201+
public void scalarHiddenMerge(Blackhole bh) {
202+
scalarHidden.subscribe(new LatchedObserver<Integer>(bh));
203+
}
204+
@Benchmark
205+
public void oneMerge(Blackhole bh) {
206+
oneMerge.subscribe(new LatchedObserver<Integer>(bh));
207+
}
208+
@Benchmark
209+
public void singleMerge(Blackhole bh) {
210+
single.subscribe(new PerfSingleSubscriber(bh));
211+
}
212+
@Benchmark
213+
public void singleHiddenMerge(Blackhole bh) {
214+
singleHiddenMerge.subscribe(new PerfSingleSubscriber(bh));
215+
}
216+
217+
@Benchmark
218+
public void scalarSwitch(Blackhole bh) {
219+
scalarSwitch.subscribe(new LatchedObserver<Integer>(bh));
220+
}
221+
@Benchmark
222+
public void scalarHiddenSwitch(Blackhole bh) {
223+
scalarHiddenSwitch.subscribe(new LatchedObserver<Integer>(bh));
224+
}
225+
@Benchmark
226+
public void oneSwitch(Blackhole bh) {
227+
oneSwitch.subscribe(new LatchedObserver<Integer>(bh));
228+
}
229+
230+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.jmh;
17+
18+
import java.util.concurrent.CountDownLatch;
19+
20+
import org.openjdk.jmh.infra.Blackhole;
21+
22+
import rx.SingleSubscriber;
23+
24+
/**
25+
* A SingleSubscriber implementation for asynchronous benchmarks that sends
26+
* the onSuccess and onError signals to a JMH Blackhole.
27+
* <p>
28+
* Use {@code sleepAwait} or {@code spinAwait}.
29+
*/
30+
public final class PerfAsyncSingleSubscriber extends SingleSubscriber<Object> {
31+
final Blackhole bh;
32+
33+
final CountDownLatch cdl;
34+
35+
public PerfAsyncSingleSubscriber(Blackhole bh) {
36+
this.bh = bh;
37+
this.cdl = new CountDownLatch(1);
38+
}
39+
40+
@Override
41+
public void onSuccess(Object value) {
42+
bh.consume(value);
43+
cdl.countDown();
44+
}
45+
46+
@Override
47+
public void onError(Throwable error) {
48+
bh.consume(error);
49+
cdl.countDown();
50+
}
51+
52+
/**
53+
* Sleeps until the subscriber receives an event.
54+
*/
55+
public void sleepAwait() {
56+
try {
57+
cdl.await();
58+
} catch (InterruptedException ex) {
59+
throw new RuntimeException(ex);
60+
}
61+
}
62+
63+
/**
64+
* Spins until the subscriber receives an events.
65+
*/
66+
public void spinAwait() {
67+
while (cdl.getCount() != 0) ;
68+
}
69+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.jmh;
17+
18+
import org.openjdk.jmh.infra.Blackhole;
19+
20+
import rx.SingleSubscriber;
21+
22+
/**
23+
* A SingleSubscriber implementation for synchronous benchmarks that sends
24+
* the onSuccess and onError signals to a JMH Blackhole.
25+
*/
26+
public final class PerfSingleSubscriber extends SingleSubscriber<Object> {
27+
final Blackhole bh;
28+
29+
public PerfSingleSubscriber(Blackhole bh) {
30+
this.bh = bh;
31+
}
32+
33+
@Override
34+
public void onSuccess(Object value) {
35+
bh.consume(value);
36+
}
37+
38+
@Override
39+
public void onError(Throwable error) {
40+
bh.consume(error);
41+
}
42+
}

0 commit comments

Comments
 (0)