Skip to content

Commit af55072

Browse files
committed
provides performance improvements
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 576c0f5 commit af55072

File tree

12 files changed

+682
-313
lines changed

12 files changed

+682
-313
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@ plugins {
2020
id 'com.github.sherter.google-java-format' version '0.7.1' apply false
2121
id 'com.jfrog.artifactory' version '4.7.3' apply false
2222
id 'com.jfrog.bintray' version '1.8.4' apply false
23-
id 'me.champeau.gradle.jmh' version '0.4.7' apply false
24-
id 'io.spring.dependency-management' version '1.0.6.RELEASE' apply false
23+
id 'me.champeau.gradle.jmh' version '0.4.8' apply false
24+
id 'io.spring.dependency-management' version '1.0.7.RELEASE' apply false
2525
id 'io.morethan.jmhreport' version '0.9.0' apply false
2626
}
2727

rsocket-core/jmh.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ dependencies {
1919
jmh configurations.implementation
2020
jmh 'org.openjdk.jmh:jmh-core'
2121
jmh 'org.openjdk.jmh:jmh-generator-annprocess'
22+
jmh 'io.projectreactor:reactor-test'
23+
jmh project(':rsocket-transport-local')
2224
}
2325

2426
jmhCompileGeneratedClasses.enabled = false
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.rsocket;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import org.openjdk.jmh.infra.Blackhole;
5+
import org.reactivestreams.Subscription;
6+
import reactor.core.CoreSubscriber;
7+
8+
public class MaxPerfSubscriber implements CoreSubscriber<Payload> {
9+
10+
final CountDownLatch latch = new CountDownLatch(1);
11+
final Blackhole blackhole;
12+
13+
public MaxPerfSubscriber(Blackhole blackhole) {
14+
this.blackhole = blackhole;
15+
}
16+
17+
@Override
18+
public void onSubscribe(Subscription s) {
19+
s.request(Long.MAX_VALUE);
20+
}
21+
22+
@Override
23+
public void onNext(Payload payload) {
24+
payload.release();
25+
blackhole.consume(payload);
26+
}
27+
28+
@Override
29+
public void onError(Throwable t) {
30+
blackhole.consume(t);
31+
latch.countDown();
32+
}
33+
34+
@Override
35+
public void onComplete() {
36+
latch.countDown();
37+
}
38+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.rsocket;
2+
3+
import java.util.concurrent.CountDownLatch;
4+
import org.openjdk.jmh.infra.Blackhole;
5+
import org.reactivestreams.Subscription;
6+
import reactor.core.CoreSubscriber;
7+
8+
public class PerfSubscriber implements CoreSubscriber<Payload> {
9+
10+
final CountDownLatch latch = new CountDownLatch(1);
11+
final Blackhole blackhole;
12+
13+
Subscription s;
14+
15+
public PerfSubscriber(Blackhole blackhole) {
16+
this.blackhole = blackhole;
17+
}
18+
19+
@Override
20+
public void onSubscribe(Subscription s) {
21+
this.s = s;
22+
s.request(1);
23+
}
24+
25+
@Override
26+
public void onNext(Payload payload) {
27+
payload.release();
28+
blackhole.consume(payload);
29+
s.request(1);
30+
}
31+
32+
@Override
33+
public void onError(Throwable t) {
34+
blackhole.consume(t);
35+
latch.countDown();
36+
}
37+
38+
@Override
39+
public void onComplete() {
40+
latch.countDown();
41+
}
42+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
package io.rsocket;
2+
3+
import io.rsocket.frame.decoder.PayloadDecoder;
4+
import io.rsocket.transport.local.LocalClientTransport;
5+
import io.rsocket.transport.local.LocalServerTransport;
6+
import io.rsocket.util.EmptyPayload;
7+
import java.util.stream.IntStream;
8+
import org.openjdk.jmh.annotations.Benchmark;
9+
import org.openjdk.jmh.annotations.BenchmarkMode;
10+
import org.openjdk.jmh.annotations.Fork;
11+
import org.openjdk.jmh.annotations.Measurement;
12+
import org.openjdk.jmh.annotations.Mode;
13+
import org.openjdk.jmh.annotations.Scope;
14+
import org.openjdk.jmh.annotations.Setup;
15+
import org.openjdk.jmh.annotations.State;
16+
import org.openjdk.jmh.annotations.Warmup;
17+
import org.openjdk.jmh.infra.Blackhole;
18+
import org.reactivestreams.Publisher;
19+
import reactor.core.CoreSubscriber;
20+
import reactor.core.publisher.Flux;
21+
import reactor.core.publisher.Mono;
22+
23+
@BenchmarkMode(Mode.Throughput)
24+
@Fork(
25+
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=paranoid"}
26+
)
27+
@Warmup(iterations = 10)
28+
@Measurement(iterations = 10, time = 10)
29+
@State(Scope.Benchmark)
30+
public class RSocketPerf {
31+
32+
static final Payload PAYLOAD = EmptyPayload.INSTANCE;
33+
static final Mono<Payload> PAYLOAD_MONO = Mono.just(PAYLOAD);
34+
static final Flux<Payload> PAYLOAD_FLUX =
35+
Flux.fromArray(IntStream.range(0, 100000).mapToObj(__ -> PAYLOAD).toArray(Payload[]::new));
36+
37+
RSocket client;
38+
Closeable server;
39+
40+
@Setup
41+
public void setUp(Blackhole blackhole) {
42+
server =
43+
RSocketFactory.receive()
44+
.acceptor(
45+
(setup, sendingSocket) ->
46+
Mono.just(
47+
new AbstractRSocket() {
48+
49+
@Override
50+
public Mono<Void> fireAndForget(Payload payload) {
51+
payload.release();
52+
blackhole.consume(payload);
53+
54+
return Mono.empty();
55+
}
56+
57+
@Override
58+
public Mono<Payload> requestResponse(Payload payload) {
59+
payload.release();
60+
return PAYLOAD_MONO;
61+
}
62+
63+
@Override
64+
public Flux<Payload> requestStream(Payload payload) {
65+
payload.release();
66+
return PAYLOAD_FLUX;
67+
}
68+
69+
@Override
70+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
71+
return Flux.from(payloads);
72+
}
73+
}))
74+
.transport(LocalServerTransport.create("server"))
75+
.start()
76+
.block();
77+
78+
client =
79+
RSocketFactory.connect()
80+
.frameDecoder(PayloadDecoder.ZERO_COPY)
81+
.transport(LocalClientTransport.create("server"))
82+
.start()
83+
.block();
84+
}
85+
86+
@Benchmark
87+
@SuppressWarnings("unchecked")
88+
public PerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException {
89+
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
90+
client.fireAndForget(PAYLOAD).subscribe((CoreSubscriber) subscriber);
91+
subscriber.latch.await();
92+
93+
return subscriber;
94+
}
95+
96+
@Benchmark
97+
public PerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException {
98+
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
99+
client.requestResponse(PAYLOAD).subscribe(subscriber);
100+
subscriber.latch.await();
101+
102+
return subscriber;
103+
}
104+
105+
@Benchmark
106+
public PerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole)
107+
throws InterruptedException {
108+
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
109+
client.requestStream(PAYLOAD).subscribe(subscriber);
110+
subscriber.latch.await();
111+
112+
return subscriber;
113+
}
114+
115+
@Benchmark
116+
public MaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole)
117+
throws InterruptedException {
118+
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
119+
client.requestStream(PAYLOAD).subscribe(subscriber);
120+
subscriber.latch.await();
121+
122+
return subscriber;
123+
}
124+
125+
@Benchmark
126+
public PerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole)
127+
throws InterruptedException {
128+
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
129+
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
130+
subscriber.latch.await();
131+
132+
return subscriber;
133+
}
134+
135+
@Benchmark
136+
public MaxPerfSubscriber requestChannelWithRequestAllStrategy(Blackhole blackhole)
137+
throws InterruptedException {
138+
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
139+
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
140+
subscriber.latch.await();
141+
142+
return subscriber;
143+
}
144+
}

0 commit comments

Comments
 (0)