Skip to content

Commit e2e20e8

Browse files
NiteshKantstevegury
authored andcommitted
Microbenchmarks for tcp and local (#184)
#### Problem No benchmarks for transports. #### Modification Added a benchmark to compare request-response across multiple transports. Added local and tcp for now. #### Result Benchmark to give some idea about how different changes impact the perf.
1 parent ccde433 commit e2e20e8

File tree

6 files changed

+275
-2
lines changed

6 files changed

+275
-2
lines changed

reactivesocket-core/build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ buildscript {
2727
apply plugin: 'me.champeau.gradle.jmh'
2828

2929
jmh {
30-
jmhVersion = '1.12'
30+
jmhVersion = '1.15'
3131
jvmArgs = '-XX:+UnlockCommercialFeatures -XX:+FlightRecorder'
3232
profilers = ['gc']
3333
zip64 = true
@@ -41,5 +41,5 @@ dependencies {
4141

4242
testCompile project(':reactivesocket-test')
4343

44-
jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.12'
44+
jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.15'
4545
}

reactivesocket-examples/build.gradle

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,36 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
buildscript {
17+
repositories {
18+
maven { url "https://plugins.gradle.org/m2/" }
19+
}
20+
21+
dependencies {
22+
classpath 'gradle.plugin.me.champeau.gradle:jmh-gradle-plugin:0.3.0'
23+
}
24+
}
25+
26+
apply plugin: 'me.champeau.gradle.jmh'
27+
28+
jmh {
29+
jmhVersion = '1.15'
30+
jvmArgs = '-XX:+UnlockCommercialFeatures -XX:+FlightRecorder'
31+
profilers = ['gc']
32+
zip64 = true
33+
}
1634

1735
dependencies {
1836
compile project(':reactivesocket-core')
1937
compile project(':reactivesocket-client')
2038
compile project(':reactivesocket-discovery-eureka')
2139
compile project(':reactivesocket-stats-servo')
2240
compile project(':reactivesocket-transport-tcp')
41+
compile project(':reactivesocket-transport-local')
2342

2443
compile project(':reactivesocket-test')
44+
45+
compile 'org.slf4j:slf4j-log4j12:1.7.21'
46+
47+
jmh group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.15'
2548
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf;
15+
16+
import io.reactivesocket.local.LocalClient;
17+
import io.reactivesocket.local.LocalServer;
18+
import io.reactivesocket.perf.util.AbstractMicrobenchmarkBase;
19+
import io.reactivesocket.perf.util.BlackholeSubscriber;
20+
import io.reactivesocket.perf.util.ClientServerHolder;
21+
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
22+
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
23+
import io.reactivesocket.util.PayloadImpl;
24+
import org.openjdk.jmh.annotations.Benchmark;
25+
import org.openjdk.jmh.annotations.BenchmarkMode;
26+
import org.openjdk.jmh.annotations.Level;
27+
import org.openjdk.jmh.annotations.Mode;
28+
import org.openjdk.jmh.annotations.OutputTimeUnit;
29+
import org.openjdk.jmh.annotations.Param;
30+
import org.openjdk.jmh.annotations.Scope;
31+
import org.openjdk.jmh.annotations.Setup;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.infra.Blackhole;
34+
35+
import java.util.concurrent.CountDownLatch;
36+
import java.util.concurrent.ThreadLocalRandom;
37+
import java.util.concurrent.TimeUnit;
38+
39+
@BenchmarkMode(Mode.Throughput)
40+
@OutputTimeUnit(TimeUnit.SECONDS)
41+
@State(Scope.Benchmark)
42+
public class RequestResponsePerf extends AbstractMicrobenchmarkBase {
43+
44+
public static final String TRANSPORT_TCP = "tcp";
45+
public static final String TRANSPORT_LOCAL = "local";
46+
47+
@Param({ TRANSPORT_TCP, TRANSPORT_LOCAL })
48+
public String transport;
49+
50+
public Blackhole bh;
51+
52+
public ClientServerHolder localHolder;
53+
public ClientServerHolder tcpHolder;
54+
55+
@Setup(Level.Trial)
56+
public void setup(Blackhole bh) {
57+
tcpHolder = ClientServerHolder.requestResponse(TcpTransportServer.create(),
58+
socketAddress -> TcpTransportClient.create(socketAddress));
59+
String clientName = "local-" + ThreadLocalRandom.current().nextInt();
60+
localHolder = ClientServerHolder.requestResponse(LocalServer.create(clientName),
61+
socketAddress -> LocalClient.create(clientName));
62+
this.bh = bh;
63+
}
64+
65+
@Benchmark
66+
public void requestResponse() throws InterruptedException {
67+
ClientServerHolder holder;
68+
switch (transport) {
69+
case TRANSPORT_LOCAL:
70+
holder = localHolder;
71+
break;
72+
case TRANSPORT_TCP:
73+
holder = tcpHolder;
74+
break;
75+
default:
76+
throw new IllegalArgumentException("Unknown transport: " + transport);
77+
}
78+
requestResponse(holder);
79+
}
80+
81+
protected void requestResponse(ClientServerHolder holder) throws InterruptedException {
82+
CountDownLatch latch = new CountDownLatch(1);
83+
holder.getClient().requestResponse(new PayloadImpl(ClientServerHolder.HELLO))
84+
.subscribe(new BlackholeSubscriber<>(bh, () -> latch.countDown()));
85+
latch.await();
86+
}
87+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf.util;
15+
16+
import org.openjdk.jmh.annotations.Fork;
17+
import org.openjdk.jmh.annotations.Measurement;
18+
import org.openjdk.jmh.annotations.Scope;
19+
import org.openjdk.jmh.annotations.State;
20+
import org.openjdk.jmh.annotations.Warmup;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* Base class for all JMH benchmarks.
26+
*/
27+
@Warmup(iterations = AbstractMicrobenchmarkBase.DEFAULT_WARMUP_ITERATIONS)
28+
@Measurement(iterations = AbstractMicrobenchmarkBase.DEFAULT_MEASURE_ITERATIONS,
29+
batchSize = AbstractMicrobenchmarkBase.DEFAULT_WARMUP_ITERATIONS,
30+
time = 1, timeUnit = TimeUnit.SECONDS)
31+
@Fork(AbstractMicrobenchmarkBase.DEFAULT_FORKS)
32+
@State(Scope.Thread)
33+
public abstract class AbstractMicrobenchmarkBase {
34+
35+
protected static final int DEFAULT_WARMUP_ITERATIONS = 10;
36+
protected static final int DEFAULT_MEASURE_ITERATIONS = 10;
37+
protected static final int DEFAULT_FORKS = 2;
38+
39+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf.util;
15+
16+
import io.reactivesocket.Payload;
17+
import org.openjdk.jmh.infra.Blackhole;
18+
import org.reactivestreams.Subscriber;
19+
import org.reactivestreams.Subscription;
20+
21+
public class BlackholeSubscriber<T> implements Subscriber<T> {
22+
23+
private final Blackhole blackhole;
24+
private final Runnable onTerminate;
25+
26+
public BlackholeSubscriber(Blackhole blackhole, Runnable onTerminate) {
27+
this.blackhole = blackhole;
28+
this.onTerminate = onTerminate;
29+
}
30+
31+
@Override
32+
public void onSubscribe(Subscription s) {
33+
s.request(1);
34+
}
35+
36+
@Override
37+
public void onNext(T payload) {
38+
blackhole.consume(payload);
39+
}
40+
41+
@Override
42+
public void onError(Throwable t) {
43+
t.printStackTrace();
44+
onTerminate.run();
45+
}
46+
47+
@Override
48+
public void onComplete() {
49+
onTerminate.run();
50+
}
51+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf.util;
15+
16+
import io.reactivesocket.AbstractReactiveSocket;
17+
import io.reactivesocket.Payload;
18+
import io.reactivesocket.ReactiveSocket;
19+
import io.reactivesocket.client.KeepAliveProvider;
20+
import io.reactivesocket.client.ReactiveSocketClient;
21+
import io.reactivesocket.client.SetupProvider;
22+
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket;
23+
import io.reactivesocket.reactivestreams.extensions.Px;
24+
import io.reactivesocket.server.ReactiveSocketServer;
25+
import io.reactivesocket.transport.TransportClient;
26+
import io.reactivesocket.transport.TransportServer;
27+
import io.reactivesocket.transport.TransportServer.StartedServer;
28+
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
29+
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
30+
import io.reactivesocket.util.PayloadImpl;
31+
import io.reactivex.Flowable;
32+
import io.reactivex.Observable;
33+
import org.openjdk.jmh.infra.Blackhole;
34+
import org.reactivestreams.Publisher;
35+
36+
import java.net.SocketAddress;
37+
import java.nio.charset.StandardCharsets;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.function.Function;
40+
41+
public class ClientServerHolder {
42+
43+
public static final byte[] HELLO = "HELLO".getBytes(StandardCharsets.UTF_8);
44+
45+
private final StartedServer server;
46+
private final ReactiveSocket client;
47+
48+
public ClientServerHolder(TransportServer transportServer, Function<SocketAddress, TransportClient> clientFactory,
49+
ReactiveSocket handler) {
50+
server = ReactiveSocketServer.create(transportServer)
51+
.start((setup, sendingSocket) -> {
52+
return new DisabledLeaseAcceptingSocket(handler);
53+
});
54+
SetupProvider setupProvider = SetupProvider.keepAlive(KeepAliveProvider.never()).disableLease();
55+
ReactiveSocketClient client =
56+
ReactiveSocketClient.create(clientFactory.apply(server.getServerAddress()), setupProvider);
57+
this.client = Flowable.fromPublisher(client.connect()).blockingLast();
58+
}
59+
60+
public ReactiveSocket getClient() {
61+
return client;
62+
}
63+
64+
public static ClientServerHolder requestResponse(TransportServer transportServer,
65+
Function<SocketAddress, TransportClient> clientFactory) {
66+
return new ClientServerHolder(transportServer, clientFactory, new AbstractReactiveSocket() {
67+
@Override
68+
public Publisher<Payload> requestResponse(Payload payload) {
69+
return Px.just(new PayloadImpl(HELLO));
70+
}
71+
});
72+
}
73+
}

0 commit comments

Comments
 (0)