Skip to content

Commit ba06df5

Browse files
committed
Merge pull request #3354 from akarnokd/PerfAwaitFix2x
2.x: perf change wait to spin-loop for short async benchmarks
2 parents 9657f50 + 046786a commit ba06df5

File tree

3 files changed

+47
-9
lines changed

3 files changed

+47
-9
lines changed

src/perf/java/io/reactivex/OperatorFlatMapPerf.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ public void flatMapIntPassthruAsync(Input input) throws InterruptedException {
4747
input.observable.flatMap(i -> {
4848
return Observable.just(i).subscribeOn(Schedulers.computation());
4949
}).subscribe(latchedObserver);
50-
latchedObserver.latch.await();
50+
if (input.size == 1) {
51+
while (latchedObserver.latch.getCount() != 0);
52+
} else {
53+
latchedObserver.latch.await();
54+
}
5155
}
5256

5357
@Benchmark

src/perf/java/io/reactivex/OperatorMergePerf.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,12 @@ public void oneStreamOfNthatMergesIn1(final InputMillion input) throws Interrupt
3131
Observable<Observable<Integer>> os = Observable.range(1, input.size).map(Observable::just);
3232
LatchedObserver<Integer> o = input.newLatchedObserver();
3333
Observable.merge(os).subscribe(o);
34-
o.latch.await();
34+
35+
if (input.size == 1) {
36+
while (o.latch.getCount() != 0);
37+
} else {
38+
o.latch.await();
39+
}
3540
}
3641

3742
// flatMap
@@ -42,7 +47,12 @@ public void merge1SyncStreamOfN(final InputMillion input) throws InterruptedExce
4247
});
4348
LatchedObserver<Integer> o = input.newLatchedObserver();
4449
Observable.merge(os).subscribe(o);
45-
o.latch.await();
50+
51+
if (input.size == 1) {
52+
while (o.latch.getCount() != 0);
53+
} else {
54+
o.latch.await();
55+
}
4656
}
4757

4858
@Benchmark
@@ -52,7 +62,11 @@ public void mergeNSyncStreamsOfN(final InputThousand input) throws InterruptedEx
5262
});
5363
LatchedObserver<Integer> o = input.newLatchedObserver();
5464
Observable.merge(os).subscribe(o);
55-
o.latch.await();
65+
if (input.size == 1) {
66+
while (o.latch.getCount() != 0);
67+
} else {
68+
o.latch.await();
69+
}
5670
}
5771

5872
@Benchmark
@@ -62,22 +76,34 @@ public void mergeNAsyncStreamsOfN(final InputThousand input) throws InterruptedE
6276
});
6377
LatchedObserver<Integer> o = input.newLatchedObserver();
6478
Observable.merge(os).subscribe(o);
65-
o.latch.await();
79+
if (input.size == 1) {
80+
while (o.latch.getCount() != 0);
81+
} else {
82+
o.latch.await();
83+
}
6684
}
6785

6886
@Benchmark
6987
public void mergeTwoAsyncStreamsOfN(final InputThousand input) throws InterruptedException {
7088
LatchedObserver<Integer> o = input.newLatchedObserver();
7189
Observable<Integer> ob = Observable.range(0, input.size).subscribeOn(Schedulers.computation());
7290
Observable.merge(ob, ob).subscribe(o);
73-
o.latch.await();
91+
if (input.size == 1) {
92+
while (o.latch.getCount() != 0);
93+
} else {
94+
o.latch.await();
95+
}
7496
}
7597

7698
@Benchmark
7799
public void mergeNSyncStreamsOf1(final InputForMergeN input) throws InterruptedException {
78100
LatchedObserver<Integer> o = input.newLatchedObserver();
79101
Observable.merge(input.observables).subscribe(o);
80-
o.latch.await();
102+
if (input.size == 1) {
103+
while (o.latch.getCount() != 0);
104+
} else {
105+
o.latch.await();
106+
}
81107
}
82108

83109
@State(Scope.Thread)

src/perf/java/io/reactivex/RangePerf.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,11 @@ public void rangeAsync(Blackhole bh) throws Exception {
6161

6262
rangeAsync.subscribe(lo);
6363

64-
lo.latch.await();
64+
if (times == 1) {
65+
while (lo.latch.getCount() != 0);
66+
} else {
67+
lo.latch.await();
68+
}
6569
}
6670

6771
@Benchmark
@@ -70,7 +74,11 @@ public void rangePipeline(Blackhole bh) throws Exception {
7074

7175
rangeAsyncPipeline.subscribe(lo);
7276

73-
lo.latch.await();
77+
if (times == 1) {
78+
while (lo.latch.getCount() != 0);
79+
} else {
80+
lo.latch.await();
81+
}
7482
}
7583

7684
}

0 commit comments

Comments
 (0)