Skip to content

Microbenchmarks for tcp and local #184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions reactivesocket-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ buildscript {
apply plugin: 'me.champeau.gradle.jmh'

jmh {
jmhVersion = '1.12'
jmhVersion = '1.15'
jvmArgs = '-XX:+UnlockCommercialFeatures -XX:+FlightRecorder'
profilers = ['gc']
zip64 = true
Expand All @@ -41,5 +41,5 @@ dependencies {

testCompile project(':reactivesocket-test')

jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.12'
jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.15'
}
23 changes: 23 additions & 0 deletions reactivesocket-examples/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,36 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
buildscript {
repositories {
maven { url "https://plugins.gradle.org/m2/" }
}

dependencies {
classpath 'gradle.plugin.me.champeau.gradle:jmh-gradle-plugin:0.3.0'
}
}

apply plugin: 'me.champeau.gradle.jmh'

jmh {
jmhVersion = '1.15'
jvmArgs = '-XX:+UnlockCommercialFeatures -XX:+FlightRecorder'
profilers = ['gc']
zip64 = true
}

dependencies {
compile project(':reactivesocket-core')
compile project(':reactivesocket-client')
compile project(':reactivesocket-discovery-eureka')
compile project(':reactivesocket-stats-servo')
compile project(':reactivesocket-transport-tcp')
compile project(':reactivesocket-transport-local')

compile project(':reactivesocket-test')

compile 'org.slf4j:slf4j-log4j12:1.7.21'

jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.15'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.perf;

import io.reactivesocket.local.LocalClient;
import io.reactivesocket.local.LocalServer;
import io.reactivesocket.perf.util.AbstractMicrobenchmarkBase;
import io.reactivesocket.perf.util.BlackholeSubscriber;
import io.reactivesocket.perf.util.ClientServerHolder;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivesocket.util.PayloadImpl;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.infra.Blackhole;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@State(Scope.Benchmark)
public class RequestResponsePerf extends AbstractMicrobenchmarkBase {

public static final String TRANSPORT_TCP = "tcp";
public static final String TRANSPORT_LOCAL = "local";

@Param({ TRANSPORT_TCP, TRANSPORT_LOCAL })
public String transport;

public Blackhole bh;

public ClientServerHolder localHolder;
public ClientServerHolder tcpHolder;

@Setup(Level.Trial)
public void setup(Blackhole bh) {
tcpHolder = ClientServerHolder.requestResponse(TcpTransportServer.create(),
socketAddress -> TcpTransportClient.create(socketAddress));
String clientName = "local-" + ThreadLocalRandom.current().nextInt();
localHolder = ClientServerHolder.requestResponse(LocalServer.create(clientName),
socketAddress -> LocalClient.create(clientName));
this.bh = bh;
}

@Benchmark
public void requestResponse() throws InterruptedException {
ClientServerHolder holder;
switch (transport) {
case TRANSPORT_LOCAL:
holder = localHolder;
break;
case TRANSPORT_TCP:
holder = tcpHolder;
break;
default:
throw new IllegalArgumentException("Unknown transport: " + transport);
}
requestResponse(holder);
}

protected void requestResponse(ClientServerHolder holder) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
holder.getClient().requestResponse(new PayloadImpl(ClientServerHolder.HELLO))
.subscribe(new BlackholeSubscriber<>(bh, () -> latch.countDown()));
latch.await();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.perf.util;

import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

import java.util.concurrent.TimeUnit;

/**
* Base class for all JMH benchmarks.
*/
@Warmup(iterations = AbstractMicrobenchmarkBase.DEFAULT_WARMUP_ITERATIONS)
@Measurement(iterations = AbstractMicrobenchmarkBase.DEFAULT_MEASURE_ITERATIONS,
batchSize = AbstractMicrobenchmarkBase.DEFAULT_WARMUP_ITERATIONS,
time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(AbstractMicrobenchmarkBase.DEFAULT_FORKS)
@State(Scope.Thread)
public abstract class AbstractMicrobenchmarkBase {

protected static final int DEFAULT_WARMUP_ITERATIONS = 10;
protected static final int DEFAULT_MEASURE_ITERATIONS = 10;
protected static final int DEFAULT_FORKS = 2;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.perf.util;

import io.reactivesocket.Payload;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

public class BlackholeSubscriber<T> implements Subscriber<T> {

private final Blackhole blackhole;
private final Runnable onTerminate;

public BlackholeSubscriber(Blackhole blackhole, Runnable onTerminate) {
this.blackhole = blackhole;
this.onTerminate = onTerminate;
}

@Override
public void onSubscribe(Subscription s) {
s.request(1);
}

@Override
public void onNext(T payload) {
blackhole.consume(payload);
}

@Override
public void onError(Throwable t) {
t.printStackTrace();
onTerminate.run();
}

@Override
public void onComplete() {
onTerminate.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2016 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.reactivesocket.perf.util;

import io.reactivesocket.AbstractReactiveSocket;
import io.reactivesocket.Payload;
import io.reactivesocket.ReactiveSocket;
import io.reactivesocket.client.KeepAliveProvider;
import io.reactivesocket.client.ReactiveSocketClient;
import io.reactivesocket.client.SetupProvider;
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
import io.reactivesocket.reactivestreams.extensions.Px;
import io.reactivesocket.server.ReactiveSocketServer;
import io.reactivesocket.transport.TransportClient;
import io.reactivesocket.transport.TransportServer;
import io.reactivesocket.transport.TransportServer.StartedServer;
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
import io.reactivesocket.util.PayloadImpl;
import io.reactivex.Flowable;
import io.reactivex.Observable;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;

import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.function.Function;

public class ClientServerHolder {

public static final byte[] HELLO = "HELLO".getBytes(StandardCharsets.UTF_8);

private final StartedServer server;
private final ReactiveSocket client;

public ClientServerHolder(TransportServer transportServer, Function<SocketAddress, TransportClient> clientFactory,
ReactiveSocket handler) {
server = ReactiveSocketServer.create(transportServer)
.start((setup, sendingSocket) -> {
return new DisabledLeaseAcceptingSocket(handler);
});
SetupProvider setupProvider = SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease();
ReactiveSocketClient client =
ReactiveSocketClient.create(clientFactory.apply(server.getServerAddress()), setupProvider);
this.client = Flowable.fromPublisher(client.connect()).blockingLast();
}

public ReactiveSocket getClient() {
return client;
}

public static ClientServerHolder requestResponse(TransportServer transportServer,
Function<SocketAddress, TransportClient> clientFactory) {
return new ClientServerHolder(transportServer, clientFactory, new AbstractReactiveSocket() {
@Override
public Publisher<Payload> requestResponse(Payload payload) {
return Px.just(new PayloadImpl(HELLO));
}
});
}
}