Skip to content

General performance improvements #598

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ plugins {
id 'com.github.sherter.google-java-format' version '0.7.1' apply false
id 'com.jfrog.artifactory' version '4.7.3' apply false
id 'com.jfrog.bintray' version '1.8.4' apply false
id 'me.champeau.gradle.jmh' version '0.4.7' apply false
id 'io.spring.dependency-management' version '1.0.6.RELEASE' apply false
id 'me.champeau.gradle.jmh' version '0.4.8' apply false
id 'io.spring.dependency-management' version '1.0.7.RELEASE' apply false
id 'io.morethan.jmhreport' version '0.9.0' apply false
}

Expand Down
2 changes: 2 additions & 0 deletions rsocket-core/jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ dependencies {
jmh configurations.implementation
jmh 'org.openjdk.jmh:jmh-core'
jmh 'org.openjdk.jmh:jmh-generator-annprocess'
jmh 'io.projectreactor:reactor-test'
jmh project(':rsocket-transport-local')
}

jmhCompileGeneratedClasses.enabled = false
Expand Down
38 changes: 38 additions & 0 deletions rsocket-core/src/jmh/java/io/rsocket/MaxPerfSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.rsocket;

import java.util.concurrent.CountDownLatch;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

public class MaxPerfSubscriber implements CoreSubscriber<Payload> {

final CountDownLatch latch = new CountDownLatch(1);
final Blackhole blackhole;

public MaxPerfSubscriber(Blackhole blackhole) {
this.blackhole = blackhole;
}

@Override
public void onSubscribe(Subscription s) {
s.request(Long.MAX_VALUE);
}

@Override
public void onNext(Payload payload) {
payload.release();
blackhole.consume(payload);
}

@Override
public void onError(Throwable t) {
blackhole.consume(t);
latch.countDown();
}

@Override
public void onComplete() {
latch.countDown();
}
}
42 changes: 42 additions & 0 deletions rsocket-core/src/jmh/java/io/rsocket/PerfSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package io.rsocket;

import java.util.concurrent.CountDownLatch;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;

public class PerfSubscriber implements CoreSubscriber<Payload> {

final CountDownLatch latch = new CountDownLatch(1);
final Blackhole blackhole;

Subscription s;

public PerfSubscriber(Blackhole blackhole) {
this.blackhole = blackhole;
}

@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(1);
}

@Override
public void onNext(Payload payload) {
payload.release();
blackhole.consume(payload);
s.request(1);
}

@Override
public void onError(Throwable t) {
blackhole.consume(t);
latch.countDown();
}

@Override
public void onComplete() {
latch.countDown();
}
}
142 changes: 142 additions & 0 deletions rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package io.rsocket;

import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.local.LocalClientTransport;
import io.rsocket.transport.local.LocalServerTransport;
import io.rsocket.util.EmptyPayload;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@BenchmarkMode(Mode.Throughput)
@Fork(
value = 1 // , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
)
@Warmup(iterations = 10)
@Measurement(iterations = 10, time = 20)
@State(Scope.Benchmark)
public class RSocketPerf {

static final Payload PAYLOAD = EmptyPayload.INSTANCE;
static final Mono<Payload> PAYLOAD_MONO = Mono.just(PAYLOAD);
static final Flux<Payload> PAYLOAD_FLUX =
Flux.fromArray(IntStream.range(0, 100000).mapToObj(__ -> PAYLOAD).toArray(Payload[]::new));

RSocket client;
Closeable server;

@Setup
public void setUp() {
server =
RSocketFactory.receive()
.acceptor(
(setup, sendingSocket) ->
Mono.just(
new AbstractRSocket() {

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.empty();
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return PAYLOAD_MONO;
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return PAYLOAD_FLUX;
}

@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads);
}
}))
.transport(LocalServerTransport.create("server"))
.start()
.block();

client =
RSocketFactory.connect()
.frameDecoder(PayloadDecoder.ZERO_COPY)
.transport(LocalClientTransport.create("server"))
.start()
.block();
}

@Benchmark
@SuppressWarnings("unchecked")
public PerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.fireAndForget(PAYLOAD).subscribe((CoreSubscriber) subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.requestResponse(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole blackhole)
throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.requestStream(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public MaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole)
throws InterruptedException {
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
client.requestStream(PAYLOAD).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public PerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole blackhole)
throws InterruptedException {
PerfSubscriber subscriber = new PerfSubscriber(blackhole);
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}

@Benchmark
public MaxPerfSubscriber requestChannelWithRequestAllStrategy(Blackhole blackhole)
throws InterruptedException {
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
subscriber.latch.await();

return subscriber;
}
}
Loading