Skip to content

1.x: benchmark just and Single #3982

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 1, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 230 additions & 0 deletions src/perf/java/rx/OneItemPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package rx;

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import rx.Observable.OnSubscribe;
import rx.functions.Func1;
import rx.internal.producers.SingleProducer;
import rx.jmh.*;

/**
* Benchmark operators working on a one-item source.
* <p>
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OneItemPerf.*"
* <p>
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*OneItemPerf.*"
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Thread)
public class OneItemPerf {

Observable<Integer> scalar;
Observable<Integer> scalarHidden;
Observable<Integer> one;
Single<Integer> single;
Single<Integer> singleHidden;

Observable<Integer> scalarConcat;
Observable<Integer> scalarHiddenConcat;
Observable<Integer> oneConcat;

Observable<Integer> scalarMerge;
Observable<Integer> scalarHiddenMerge;
Observable<Integer> oneMerge;
Single<Integer> singleMerge;
Single<Integer> singleHiddenMerge;

Observable<Integer> scalarSwitch;
Observable<Integer> scalarHiddenSwitch;
Observable<Integer> oneSwitch;

<T> Single<T> hide(final Single<T> single) {
return Single.create(new Single.OnSubscribe<T>() {
@Override
public void call(SingleSubscriber<? super T> t) {
single.subscribe(t);
}
});
}

@Setup
public void setup() {
scalar = Observable.just(1);
one = Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> t) {
t.setProducer(new SingleProducer<Integer>(t, 1));
}
});
single = Single.just(1);
singleHidden = hide(single);
scalarHidden = scalar.asObservable();

// ----------------------------------------------------------------------------

scalarConcat = scalar.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});
scalarHiddenConcat = scalarHidden.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});

oneConcat = one.concatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});

// ----------------------------------------------------------------------------

scalarMerge = scalar.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});
scalarHiddenMerge = scalarHidden.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});

oneMerge = one.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});
singleMerge = single.flatMap(new Func1<Integer, Single<Integer>>() {
@Override
public Single<Integer> call(Integer v) {
return single;
}
});
singleHiddenMerge = hide(single).flatMap(new Func1<Integer, Single<Integer>>() {
@Override
public Single<Integer> call(Integer v) {
return single;
}
});

// ----------------------------------------------------------------------------

scalarSwitch = scalar.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});
scalarHiddenSwitch = scalarHidden.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});

oneSwitch = one.switchMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
return scalar;
}
});
}

@Benchmark
public void scalar(Blackhole bh) {
scalar.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void scalarHidden(Blackhole bh) {
scalarHidden.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void one(Blackhole bh) {
one.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void single(Blackhole bh) {
single.subscribe(new PerfSingleSubscriber(bh));
}
@Benchmark
public void singleHidden(Blackhole bh) {
singleHidden.subscribe(new PerfSingleSubscriber(bh));
}

@Benchmark
public void scalarConcat(Blackhole bh) {
scalarConcat.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void scalarHiddenConcat(Blackhole bh) {
scalarHidden.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void oneConcat(Blackhole bh) {
oneConcat.subscribe(new LatchedObserver<Integer>(bh));
}

@Benchmark
public void scalarMerge(Blackhole bh) {
scalarMerge.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void scalarHiddenMerge(Blackhole bh) {
scalarHidden.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void oneMerge(Blackhole bh) {
oneMerge.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void singleMerge(Blackhole bh) {
single.subscribe(new PerfSingleSubscriber(bh));
}
@Benchmark
public void singleHiddenMerge(Blackhole bh) {
singleHiddenMerge.subscribe(new PerfSingleSubscriber(bh));
}

@Benchmark
public void scalarSwitch(Blackhole bh) {
scalarSwitch.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void scalarHiddenSwitch(Blackhole bh) {
scalarHiddenSwitch.subscribe(new LatchedObserver<Integer>(bh));
}
@Benchmark
public void oneSwitch(Blackhole bh) {
oneSwitch.subscribe(new LatchedObserver<Integer>(bh));
}

}
69 changes: 69 additions & 0 deletions src/perf/java/rx/jmh/PerfAsyncSingleSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.jmh;

import java.util.concurrent.CountDownLatch;

import org.openjdk.jmh.infra.Blackhole;

import rx.SingleSubscriber;

/**
* A SingleSubscriber implementation for asynchronous benchmarks that sends
* the onSuccess and onError signals to a JMH Blackhole.
* <p>
* Use {@code sleepAwait} or {@code spinAwait}.
*/
public final class PerfAsyncSingleSubscriber extends SingleSubscriber<Object> {
final Blackhole bh;

final CountDownLatch cdl;

public PerfAsyncSingleSubscriber(Blackhole bh) {
this.bh = bh;
this.cdl = new CountDownLatch(1);
}

@Override
public void onSuccess(Object value) {
bh.consume(value);
cdl.countDown();
}

@Override
public void onError(Throwable error) {
bh.consume(error);
cdl.countDown();
}

/**
* Sleeps until the subscriber receives an event.
*/
public void sleepAwait() {
try {
cdl.await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

/**
* Spins until the subscriber receives an events.
*/
public void spinAwait() {
while (cdl.getCount() != 0) ;
}
}
42 changes: 42 additions & 0 deletions src/perf/java/rx/jmh/PerfSingleSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.jmh;

import org.openjdk.jmh.infra.Blackhole;

import rx.SingleSubscriber;

/**
* A SingleSubscriber implementation for synchronous benchmarks that sends
* the onSuccess and onError signals to a JMH Blackhole.
*/
public final class PerfSingleSubscriber extends SingleSubscriber<Object> {
final Blackhole bh;

public PerfSingleSubscriber(Blackhole bh) {
this.bh = bh;
}

@Override
public void onSuccess(Object value) {
bh.consume(value);
}

@Override
public void onError(Throwable error) {
bh.consume(error);
}
}