Skip to content

Commit 680f2ee

Browse files
committed
1.x: perf benchmark for the cost of subscribing.
This PR adds a perf test that measures what it takes to subscribe to a non-backpressured and backpressured source and what is the effect of the mini-arbitration inside Subscriber.
1 parent ca7f862 commit 680f2ee

File tree

1 file changed

+152
-0
lines changed

1 file changed

+152
-0
lines changed

src/perf/java/rx/SubscribingPerf.java

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import org.openjdk.jmh.annotations.*;
22+
import org.openjdk.jmh.infra.Blackhole;
23+
24+
/**
25+
* Benchmark the cost of subscription and initial request management.
26+
* <p>
27+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*SubscribingPerf.*"
28+
* <p>
29+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*SubscribingPerf.*"
30+
*/
31+
@BenchmarkMode(Mode.Throughput)
32+
@OutputTimeUnit(TimeUnit.SECONDS)
33+
@State(Scope.Thread)
34+
public class SubscribingPerf {
35+
36+
Observable<Integer> just = Observable.just(1);
37+
Observable<Integer> range = Observable.range(1, 2);
38+
39+
@Benchmark
40+
public void justDirect(Blackhole bh) {
41+
just.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
42+
}
43+
44+
@Benchmark
45+
public void justStarted(Blackhole bh) {
46+
just.subscribe(new StartedSubscriber<Integer>(Long.MAX_VALUE, bh));
47+
}
48+
49+
@Benchmark
50+
public void justUsual(Blackhole bh) {
51+
just.subscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
52+
}
53+
54+
@Benchmark
55+
public void rangeDirect(Blackhole bh) {
56+
range.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
57+
}
58+
59+
@Benchmark
60+
public void rangeStarted(Blackhole bh) {
61+
range.subscribe(new DirectSubscriber<Integer>(Long.MAX_VALUE, bh));
62+
}
63+
64+
@Benchmark
65+
public void rangeUsual(Blackhole bh) {
66+
range.subscribe(new UsualSubscriber<Integer>(Long.MAX_VALUE, bh));
67+
}
68+
69+
70+
static final class DirectSubscriber<T> extends Subscriber<T> {
71+
final long r;
72+
final Blackhole bh;
73+
public DirectSubscriber(long r, Blackhole bh) {
74+
this.r = r;
75+
this.bh = bh;
76+
}
77+
@Override
78+
public void onNext(T t) {
79+
bh.consume(t);
80+
}
81+
82+
@Override
83+
public void onError(Throwable e) {
84+
e.printStackTrace();
85+
}
86+
87+
@Override
88+
public void onCompleted() {
89+
}
90+
91+
@Override
92+
public void setProducer(Producer p) {
93+
p.request(r);
94+
}
95+
}
96+
97+
static final class StartedSubscriber<T> extends Subscriber<T> {
98+
final long r;
99+
final Blackhole bh;
100+
public StartedSubscriber(long r, Blackhole bh) {
101+
this.r = r;
102+
this.bh = bh;
103+
}
104+
105+
@Override
106+
public void onStart() {
107+
request(r);
108+
}
109+
110+
@Override
111+
public void onNext(T t) {
112+
bh.consume(t);
113+
}
114+
115+
@Override
116+
public void onError(Throwable e) {
117+
e.printStackTrace();
118+
}
119+
120+
@Override
121+
public void onCompleted() {
122+
123+
}
124+
}
125+
126+
/**
127+
* This requests in the constructor.
128+
* @param <T> the value type
129+
*/
130+
static final class UsualSubscriber<T> extends Subscriber<T> {
131+
final Blackhole bh;
132+
public UsualSubscriber(long r, Blackhole bh) {
133+
this.bh = bh;
134+
request(r);
135+
}
136+
137+
@Override
138+
public void onNext(T t) {
139+
bh.consume(t);
140+
}
141+
142+
@Override
143+
public void onError(Throwable e) {
144+
e.printStackTrace();
145+
}
146+
147+
@Override
148+
public void onCompleted() {
149+
150+
}
151+
}
152+
}

0 commit comments

Comments
 (0)