Skip to content

Commit 5576628

Browse files
committed
Merge pull request #3478 from akarnokd/SubscribingPerf1x
1.x: perf benchmark for the cost of subscribing
2 parents ca7f862 + 1d643e1 commit 5576628

File tree

1 file changed

+182
-0
lines changed

1 file changed

+182
-0
lines changed

src/perf/java/rx/SubscribingPerf.java

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.openjdk.jmh.annotations.*;
22+
import org.openjdk.jmh.infra.Blackhole;
23+
24+
/**
25+
* Benchmark the cost of subscription and initial request management.
26+
* <p>
27+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*SubscribingPerf.*"
28+
* <p>
29+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*SubscribingPerf.*"
30+
*/
31+
@BenchmarkMode(Mode.Throughput)
32+
@OutputTimeUnit(TimeUnit.SECONDS)
33+
@State(Scope.Thread)
34+
public class SubscribingPerf {
35+
36+
Observable<Integer> just = Observable.just(1);
37+
Observable<Integer> range = Observable.range(1, 2);
38+
39+
@Benchmark
40+
public void justDirect(Blackhole bh) {
41+
just.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
42+
}
43+
44+
@Benchmark
45+
public void justStarted(Blackhole bh) {
46+
just.subscribe(new StartedSubscriber<Integer>(Long.MAX_VALUE, bh));
47+
}
48+
49+
@Benchmark
50+
public void justUsual(Blackhole bh) {
51+
just.subscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
52+
}
53+
54+
@Benchmark
55+
public void rangeDirect(Blackhole bh) {
56+
range.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
57+
}
58+
59+
@Benchmark
60+
public void rangeStarted(Blackhole bh) {
61+
range.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
62+
}
63+
64+
@Benchmark
65+
public void rangeUsual(Blackhole bh) {
66+
range.subscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
67+
}
68+
69+
@Benchmark
70+
public void justDirectUnsafe(Blackhole bh) {
71+
just.unsafeSubscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
72+
}
73+
74+
@Benchmark
75+
public void justStartedUnsafe(Blackhole bh) {
76+
just.unsafeSubscribe(new StartedSubscriber<Integer>(Long.MAX_VALUE, bh));
77+
}
78+
79+
@Benchmark
80+
public void justUsualUnsafe(Blackhole bh) {
81+
just.unsafeSubscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
82+
}
83+
84+
@Benchmark
85+
public void rangeDirectUnsafe(Blackhole bh) {
86+
range.unsafeSubscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
87+
}
88+
89+
@Benchmark
90+
public void rangeStartedUnsafe(Blackhole bh) {
91+
range.unsafeSubscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
92+
}
93+
94+
@Benchmark
95+
public void rangeUsualUnsafe(Blackhole bh) {
96+
range.unsafeSubscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
97+
}
98+
99+
100+
static final class DirectSubscriber<T> extends Subscriber<T> {
101+
final long r;
102+
final Blackhole bh;
103+
public DirectSubscriber(long r, Blackhole bh) {
104+
this.r = r;
105+
this.bh = bh;
106+
}
107+
@Override
108+
public void onNext(T t) {
109+
bh.consume(t);
110+
}
111+
112+
@Override
113+
public void onError(Throwable e) {
114+
e.printStackTrace();
115+
}
116+
117+
@Override
118+
public void onCompleted() {
119+
}
120+
121+
@Override
122+
public void setProducer(Producer p) {
123+
p.request(r);
124+
}
125+
}
126+
127+
static final class StartedSubscriber<T> extends Subscriber<T> {
128+
final long r;
129+
final Blackhole bh;
130+
public StartedSubscriber(long r, Blackhole bh) {
131+
this.r = r;
132+
this.bh = bh;
133+
}
134+
135+
@Override
136+
public void onStart() {
137+
request(r);
138+
}
139+
140+
@Override
141+
public void onNext(T t) {
142+
bh.consume(t);
143+
}
144+
145+
@Override
146+
public void onError(Throwable e) {
147+
e.printStackTrace();
148+
}
149+
150+
@Override
151+
public void onCompleted() {
152+
153+
}
154+
}
155+
156+
/**
157+
* This requests in the constructor.
158+
* @param <T> the value type
159+
*/
160+
static final class UsualSubscriber<T> extends Subscriber<T> {
161+
final Blackhole bh;
162+
public UsualSubscriber(long r, Blackhole bh) {
163+
this.bh = bh;
164+
request(r);
165+
}
166+
167+
@Override
168+
public void onNext(T t) {
169+
bh.consume(t);
170+
}
171+
172+
@Override
173+
public void onError(Throwable e) {
174+
e.printStackTrace();
175+
}
176+
177+
@Override
178+
public void onCompleted() {
179+
180+
}
181+
}
182+
}

0 commit comments

Comments
 (0)