Skip to content

Commit b39d30b

Browse files
NiteshKantstevegury
authored andcommitted
JMH test for larger payloads (#195)
#### Problem Current JMH tests only benchmark smaller payloads "hello". It will be good to know the numbers with larger payloads. #### Modifications - Refactored `RequestResponsePerf` and extracted a base class that can be used in all benchmarks. - Added another benchmark class to test different payload sizes. #### Result More insight into perf.
1 parent 63d4a26 commit b39d30b

File tree

5 files changed

+220
-62
lines changed

5 files changed

+220
-62
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf;
15+
16+
import io.reactivesocket.Payload;
17+
import io.reactivesocket.ReactiveSocket;
18+
import io.reactivesocket.local.LocalClient;
19+
import io.reactivesocket.local.LocalServer;
20+
import io.reactivesocket.perf.util.AbstractMicrobenchmarkBase;
21+
import io.reactivesocket.perf.util.BlackholeSubscriber;
22+
import io.reactivesocket.perf.util.ClientServerHolder;
23+
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
24+
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
25+
import io.reactivex.Flowable;
26+
import org.openjdk.jmh.annotations.Param;
27+
import org.openjdk.jmh.infra.Blackhole;
28+
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.ThreadLocalRandom;
31+
import java.util.function.Supplier;
32+
33+
public class AbstractReactiveSocketPerf extends AbstractMicrobenchmarkBase {
34+
35+
public static final String TRANSPORT_TCP_MULTI_CONNECTIONS = "tcp_multi_connections";
36+
public static final String TRANSPORT_TCP = "tcp";
37+
public static final String TRANSPORT_LOCAL = "local";
38+
39+
@Param({ TRANSPORT_TCP_MULTI_CONNECTIONS, TRANSPORT_TCP, TRANSPORT_LOCAL })
40+
public String transport;
41+
42+
protected Blackhole bh;
43+
protected Supplier<ReactiveSocket> localHolder;
44+
protected Supplier<ReactiveSocket> tcpHolder;
45+
protected Supplier<ReactiveSocket> multiClientTcpHolders;
46+
47+
protected void _setup(Blackhole bh) {
48+
tcpHolder = ClientServerHolder.create(TcpTransportServer.create(),
49+
socketAddress -> TcpTransportClient.create(socketAddress));
50+
String clientName = "local-" + ThreadLocalRandom.current().nextInt();
51+
localHolder = ClientServerHolder.create(LocalServer.create(clientName),
52+
socketAddress -> LocalClient.create(clientName));
53+
multiClientTcpHolders = ClientServerHolder.requestResponseMultiTcp(Runtime.getRuntime().availableProcessors());
54+
this.bh = bh;
55+
}
56+
57+
protected Supplier<ReactiveSocket> getSocketSupplier() {
58+
Supplier<ReactiveSocket> socketSupplier;
59+
switch (transport) {
60+
case TRANSPORT_LOCAL:
61+
socketSupplier = localHolder;
62+
break;
63+
case TRANSPORT_TCP:
64+
socketSupplier = tcpHolder;
65+
break;
66+
case TRANSPORT_TCP_MULTI_CONNECTIONS:
67+
socketSupplier = multiClientTcpHolders;
68+
break;
69+
default:
70+
throw new IllegalArgumentException("Unknown transport: " + transport);
71+
}
72+
return socketSupplier;
73+
}
74+
75+
protected void requestResponse(Supplier<ReactiveSocket> socketSupplier, Supplier<Payload> payloadSupplier,
76+
int requestCount)
77+
throws InterruptedException {
78+
CountDownLatch latch = new CountDownLatch(requestCount);
79+
for (int i = 0; i < requestCount; i++) {
80+
socketSupplier.get()
81+
.requestResponse(payloadSupplier.get())
82+
.subscribe(new BlackholeSubscriber<>(bh, () -> latch.countDown()));
83+
}
84+
latch.await();
85+
}
86+
87+
protected void requestStream(Supplier<ReactiveSocket> socketSupplier, Supplier<Payload> payloadSupplier,
88+
int requestCount, int itemCount)
89+
throws InterruptedException {
90+
CountDownLatch latch = new CountDownLatch(requestCount);
91+
for (int i = 0; i < requestCount; i++) {
92+
Flowable.fromPublisher(socketSupplier.get().requestStream(payloadSupplier.get()))
93+
.take(itemCount)
94+
.subscribe(new BlackholeSubscriber<>(bh, () -> latch.countDown()));
95+
}
96+
latch.await();
97+
}
98+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf;
15+
16+
import io.reactivesocket.ReactiveSocket;
17+
import io.reactivesocket.util.PayloadImpl;
18+
import org.openjdk.jmh.annotations.Benchmark;
19+
import org.openjdk.jmh.annotations.BenchmarkMode;
20+
import org.openjdk.jmh.annotations.Level;
21+
import org.openjdk.jmh.annotations.Mode;
22+
import org.openjdk.jmh.annotations.OutputTimeUnit;
23+
import org.openjdk.jmh.annotations.Param;
24+
import org.openjdk.jmh.annotations.Scope;
25+
import org.openjdk.jmh.annotations.Setup;
26+
import org.openjdk.jmh.annotations.State;
27+
import org.openjdk.jmh.infra.Blackhole;
28+
29+
import java.util.concurrent.ThreadLocalRandom;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.function.Supplier;
32+
33+
@BenchmarkMode(Mode.Throughput)
34+
@OutputTimeUnit(TimeUnit.SECONDS)
35+
@State(Scope.Benchmark)
36+
public class RequestResponseLargePayloadPerf extends AbstractReactiveSocketPerf {
37+
38+
@Param({"16", "1024"})
39+
public int payloadSizeKb;
40+
41+
private byte[] payloadBytes;
42+
43+
@Setup(Level.Trial)
44+
public void setup(Blackhole bh) {
45+
_setup(bh);
46+
payloadBytes = new byte[1024 * payloadSizeKb];
47+
ThreadLocalRandom.current().nextBytes(payloadBytes);
48+
}
49+
50+
@Benchmark
51+
public void requestResponseLargePayload() throws InterruptedException {
52+
Supplier<ReactiveSocket> socketSupplier = getSocketSupplier();
53+
requestResponse(socketSupplier, () -> new PayloadImpl(payloadBytes), 1);
54+
}
55+
}

reactivesocket-examples/src/jmh/java/io/reactivesocket/perf/RequestResponsePerf.java

Lines changed: 4 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,7 @@
1414
package io.reactivesocket.perf;
1515

1616
import io.reactivesocket.ReactiveSocket;
17-
import io.reactivesocket.local.LocalClient;
18-
import io.reactivesocket.local.LocalServer;
19-
import io.reactivesocket.perf.util.AbstractMicrobenchmarkBase;
20-
import io.reactivesocket.perf.util.BlackholeSubscriber;
2117
import io.reactivesocket.perf.util.ClientServerHolder;
22-
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
23-
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
2418
import io.reactivesocket.util.PayloadImpl;
2519
import org.openjdk.jmh.annotations.Benchmark;
2620
import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -33,69 +27,25 @@
3327
import org.openjdk.jmh.annotations.State;
3428
import org.openjdk.jmh.infra.Blackhole;
3529

36-
import java.util.concurrent.CountDownLatch;
37-
import java.util.concurrent.ThreadLocalRandom;
3830
import java.util.concurrent.TimeUnit;
3931
import java.util.function.Supplier;
4032

4133
@BenchmarkMode(Mode.Throughput)
4234
@OutputTimeUnit(TimeUnit.SECONDS)
4335
@State(Scope.Benchmark)
44-
public class RequestResponsePerf extends AbstractMicrobenchmarkBase {
45-
46-
public static final String TRANSPORT_TCP_MULTI_CONNECTIONS = "tcp_multi_connections";
47-
public static final String TRANSPORT_TCP = "tcp";
48-
public static final String TRANSPORT_LOCAL = "local";
49-
50-
@Param({ TRANSPORT_TCP_MULTI_CONNECTIONS, TRANSPORT_TCP, TRANSPORT_LOCAL })
51-
public String transport;
36+
public class RequestResponsePerf extends AbstractReactiveSocketPerf {
5237

5338
@Param({ "1", "100", "1000"})
5439
public int requestCount;
5540

56-
public Blackhole bh;
57-
58-
public Supplier<ReactiveSocket> localHolder;
59-
public Supplier<ReactiveSocket> tcpHolder;
60-
public Supplier<ReactiveSocket> multiClientTcpHolders;
61-
6241
@Setup(Level.Trial)
6342
public void setup(Blackhole bh) {
64-
tcpHolder = ClientServerHolder.requestResponse(TcpTransportServer.create(),
65-
socketAddress -> TcpTransportClient.create(socketAddress));
66-
String clientName = "local-" + ThreadLocalRandom.current().nextInt();
67-
localHolder = ClientServerHolder.requestResponse(LocalServer.create(clientName),
68-
socketAddress -> LocalClient.create(clientName));
69-
multiClientTcpHolders = ClientServerHolder.requestResponseMultiTcp(Runtime.getRuntime().availableProcessors());
70-
this.bh = bh;
43+
_setup(bh);
7144
}
7245

7346
@Benchmark
7447
public void requestResponse() throws InterruptedException {
75-
Supplier<ReactiveSocket> socketSupplier;
76-
switch (transport) {
77-
case TRANSPORT_LOCAL:
78-
socketSupplier = localHolder;
79-
break;
80-
case TRANSPORT_TCP:
81-
socketSupplier = tcpHolder;
82-
break;
83-
case TRANSPORT_TCP_MULTI_CONNECTIONS:
84-
socketSupplier = multiClientTcpHolders;
85-
break;
86-
default:
87-
throw new IllegalArgumentException("Unknown transport: " + transport);
88-
}
89-
requestResponse(socketSupplier);
90-
}
91-
92-
protected void requestResponse(Supplier<ReactiveSocket> socketSupplier) throws InterruptedException {
93-
CountDownLatch latch = new CountDownLatch(requestCount);
94-
for (int i = 0; i < requestCount; i++) {
95-
socketSupplier.get()
96-
.requestResponse(new PayloadImpl(ClientServerHolder.HELLO))
97-
.subscribe(new BlackholeSubscriber<>(bh, () -> latch.countDown()));
98-
}
99-
latch.await();
48+
Supplier<ReactiveSocket> socketSupplier = getSocketSupplier();
49+
requestResponse(socketSupplier, () -> new PayloadImpl(ClientServerHolder.HELLO), requestCount);
10050
}
10151
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.perf;
15+
16+
import io.reactivesocket.ReactiveSocket;
17+
import io.reactivesocket.perf.util.ClientServerHolder;
18+
import io.reactivesocket.util.PayloadImpl;
19+
import org.openjdk.jmh.annotations.Benchmark;
20+
import org.openjdk.jmh.annotations.BenchmarkMode;
21+
import org.openjdk.jmh.annotations.Level;
22+
import org.openjdk.jmh.annotations.Mode;
23+
import org.openjdk.jmh.annotations.OutputTimeUnit;
24+
import org.openjdk.jmh.annotations.Param;
25+
import org.openjdk.jmh.annotations.Scope;
26+
import org.openjdk.jmh.annotations.Setup;
27+
import org.openjdk.jmh.annotations.State;
28+
import org.openjdk.jmh.infra.Blackhole;
29+
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.function.Supplier;
32+
33+
@BenchmarkMode(Mode.Throughput)
34+
@OutputTimeUnit(TimeUnit.SECONDS)
35+
@State(Scope.Benchmark)
36+
public class RequestStreamPerf extends AbstractReactiveSocketPerf {
37+
38+
@Param({ "1", "100", "1000"})
39+
public int itemCount;
40+
41+
@Setup(Level.Trial)
42+
public void setup(Blackhole bh) {
43+
_setup(bh);
44+
}
45+
46+
@Benchmark
47+
public void requestStream() throws InterruptedException {
48+
Supplier<ReactiveSocket> socketSupplier = getSocketSupplier();
49+
requestStream(socketSupplier, () -> new PayloadImpl(ClientServerHolder.HELLO), 1, itemCount);
50+
}
51+
}

reactivesocket-examples/src/jmh/java/io/reactivesocket/perf/util/ClientServerHolder.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,10 @@
2929
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
3030
import io.reactivesocket.util.PayloadImpl;
3131
import io.reactivex.Flowable;
32-
import io.reactivex.Observable;
33-
import org.openjdk.jmh.infra.Blackhole;
3432
import org.reactivestreams.Publisher;
3533

3634
import java.net.SocketAddress;
3735
import java.nio.charset.StandardCharsets;
38-
import java.util.concurrent.CountDownLatch;
3936
import java.util.concurrent.atomic.AtomicInteger;
4037
import java.util.function.Function;
4138
import java.util.function.Supplier;
@@ -58,13 +55,13 @@ public ReactiveSocket get() {
5855
return client;
5956
}
6057

61-
public static ClientServerHolder requestResponse(TransportServer transportServer,
62-
Function<SocketAddress, TransportClient> clientFactory) {
63-
return new ClientServerHolder(transportServer, clientFactory, new RequestResponseHandler());
58+
public static ClientServerHolder create(TransportServer transportServer,
59+
Function<SocketAddress, TransportClient> clientFactory) {
60+
return new ClientServerHolder(transportServer, clientFactory, new Handler());
6461
}
6562

6663
public static Supplier<ReactiveSocket> requestResponseMultiTcp(int clientCount) {
67-
StartedServer server = startServer(TcpTransportServer.create(), new RequestResponseHandler());
64+
StartedServer server = startServer(TcpTransportServer.create(), new Handler());
6865
final ReactiveSocket[] sockets = new ReactiveSocket[clientCount];
6966
for (int i = 0; i < clientCount; i++) {
7067
sockets[i] = newClient(server.getServerAddress(), sock -> TcpTransportClient.create(sock));
@@ -96,10 +93,17 @@ private static ReactiveSocket newClient(SocketAddress serverAddress,
9693
return Flowable.fromPublisher(client.connect()).blockingLast();
9794
}
9895

99-
private static class RequestResponseHandler extends AbstractReactiveSocket {
96+
private static class Handler extends AbstractReactiveSocket {
97+
10098
@Override
10199
public Publisher<Payload> requestResponse(Payload payload) {
102100
return Px.just(new PayloadImpl(HELLO));
103101
}
102+
103+
@Override
104+
public Publisher<Payload> requestStream(Payload payload) {
105+
return Flowable.range(1, Integer.MAX_VALUE)
106+
.map(integer -> new PayloadImpl(HELLO));
107+
}
104108
}
105109
}

0 commit comments

Comments
 (0)