Skip to content

Commit 5b75d32

Browse files
committed
Merge pull request #2911 from akarnokd/OperatorPublishPerf
OperatorPublish benchmark
2 parents b31112a + 9759e63 commit 5b75d32

File tree

1 file changed

+159
-0
lines changed

1 file changed

+159
-0
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import java.util.*;
20+
import java.util.concurrent.*;
21+
22+
import org.openjdk.jmh.annotations.*;
23+
import org.openjdk.jmh.infra.Blackhole;
24+
25+
import rx.*;
26+
import rx.Observable;
27+
import rx.functions.Action0;
28+
import rx.observables.ConnectableObservable;
29+
import rx.schedulers.Schedulers;
30+
31+
/**
32+
* Benchmark typical atomic operations on volatile fields and AtomicXYZ classes.
33+
* <p>
34+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorPublishPerf.*"
35+
* <p>
36+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*OperatorPublishPerf.*"
37+
*/
38+
@BenchmarkMode(Mode.Throughput)
39+
@OutputTimeUnit(TimeUnit.SECONDS)
40+
@State(Scope.Thread)
41+
public class OperatorPublishPerf {
42+
static final class SharedLatchObserver extends Subscriber<Integer> {
43+
final CountDownLatch cdl;
44+
final int batchFrequency;
45+
final Blackhole bh;
46+
int received;
47+
public SharedLatchObserver(CountDownLatch cdl, int batchFrequency, Blackhole bh) {
48+
this.cdl = cdl;
49+
this.batchFrequency = batchFrequency;
50+
this.bh = bh;
51+
}
52+
@Override
53+
public void onStart() {
54+
request(batchFrequency);
55+
}
56+
@Override
57+
public void onNext(Integer t) {
58+
if (bh != null) {
59+
bh.consume(t);
60+
}
61+
if (++received == batchFrequency) {
62+
received = 0;
63+
request(batchFrequency);
64+
}
65+
}
66+
@Override
67+
public void onError(Throwable e) {
68+
e.printStackTrace();
69+
cdl.countDown();
70+
}
71+
@Override
72+
public void onCompleted() {
73+
cdl.countDown();
74+
}
75+
}
76+
77+
78+
/** How long the range should be. */
79+
@Param({"1", "1000", "1000000"})
80+
private int size;
81+
/** Should children use observeOn? */
82+
@Param({"false", "true"})
83+
private boolean async;
84+
/** Number of child subscribers. */
85+
@Param({"0", "1", "2", "3", "4", "5"})
86+
private int childCount;
87+
/** How often the child subscribers should re-request. */
88+
@Param({"1", "2", "4", "8", "16", "32", "64"})
89+
private int batchFrequency;
90+
91+
private ConnectableObservable<Integer> source;
92+
private Observable<Integer> observable;
93+
private CyclicBarrier sourceDone;
94+
@Setup
95+
public void setup() {
96+
List<Integer> list = new ArrayList<Integer>();
97+
for (int i = 0; i < size; i++) {
98+
list.add(i);
99+
}
100+
Observable<Integer> src = Observable.from(list);
101+
if (childCount == 0) {
102+
sourceDone = new CyclicBarrier(2);
103+
src = src
104+
// for childCount == 0, make sure we measure how fast the source is depleted
105+
.doOnCompleted(new Action0() {
106+
@Override
107+
public void call() {
108+
try {
109+
sourceDone.await(2, TimeUnit.SECONDS);
110+
} catch (InterruptedException ex) {
111+
// ignored
112+
} catch (BrokenBarrierException ex) {
113+
// ignored
114+
} catch (TimeoutException ex) {
115+
// ignored
116+
}
117+
}
118+
});
119+
}
120+
source = src.publish();
121+
observable = async ? source.observeOn(Schedulers.computation()) : source;
122+
}
123+
124+
@Benchmark
125+
public void benchmark(Blackhole bh) throws InterruptedException,
126+
TimeoutException, BrokenBarrierException {
127+
CountDownLatch completion = null;
128+
int cc = childCount;
129+
130+
if (cc > 0) {
131+
completion = new CountDownLatch(cc);
132+
Observable<Integer> o = observable;
133+
for (int i = 0; i < childCount; i++) {
134+
o.subscribe(new SharedLatchObserver(completion, batchFrequency, bh));
135+
}
136+
}
137+
138+
Subscription s = source.connect();
139+
140+
if (cc == 0) {
141+
sourceDone.await(2, TimeUnit.SECONDS);
142+
}
143+
if (completion != null && !completion.await(2, TimeUnit.SECONDS)) {
144+
throw new RuntimeException("Source hung!");
145+
}
146+
s.unsubscribe();
147+
}
148+
// public static void main(String[] args) throws Exception {
149+
// OperatorPublishPerf o = new OperatorPublishPerf();
150+
// o.async = true;
151+
// o.batchFrequency = 1;
152+
// o.childCount = 1;
153+
// o.size = 1000000;
154+
// o.setup();
155+
// for (int j = 0; j < 1000; j++) {
156+
// o.benchmark(null);
157+
// }
158+
// }
159+
}

0 commit comments

Comments
 (0)