Skip to content

Commit 12669c7

Browse files
committed
Update RSocketPerf.java
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 3b315c5 commit 12669c7

File tree

2 files changed

+33
-19
lines changed

2 files changed

+33
-19
lines changed

rsocket-core/src/jmh/java/io/rsocket/RSocketPerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222

2323
@BenchmarkMode(Mode.Throughput)
2424
@Fork(
25-
value = 1 , jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
25+
value = 1 //, jvmArgsAppend = {"-Dio.netty.leakDetection.level=advanced"}
2626
)
2727
@Warmup(iterations = 10)
2828
@Measurement(iterations = 10, time = 10)

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,51 +22,65 @@
2222
import io.rsocket.RSocket;
2323
import io.rsocket.RSocketFactory;
2424
import io.rsocket.SocketAcceptor;
25-
import io.rsocket.transport.netty.client.TcpClientTransport;
26-
import io.rsocket.transport.netty.server.TcpServerTransport;
27-
import io.rsocket.util.DefaultPayload;
25+
import io.rsocket.frame.decoder.PayloadDecoder;
26+
import io.rsocket.transport.local.LocalClientTransport;
27+
import io.rsocket.transport.local.LocalServerTransport;
28+
import io.rsocket.util.ByteBufPayload;
2829
import java.time.Duration;
2930
import org.reactivestreams.Publisher;
3031
import reactor.core.publisher.Flux;
3132
import reactor.core.publisher.Mono;
33+
import reactor.core.scheduler.Schedulers;
3234

3335
public final class ChannelEchoClient {
36+
static final Payload payload1 = ByteBufPayload.create("Hello ");
3437

3538
public static void main(String[] args) {
3639
RSocketFactory.receive()
40+
.frameDecoder(PayloadDecoder.ZERO_COPY)
3741
.acceptor(new SocketAcceptorImpl())
38-
.transport(TcpServerTransport.create("localhost", 7000))
42+
.transport(LocalServerTransport.create("localhost"))
3943
.start()
4044
.subscribe();
4145

4246
RSocket socket =
4347
RSocketFactory.connect()
44-
.transport(TcpClientTransport.create("localhost", 7000))
48+
.keepAliveAckTimeout(Duration.ofMinutes(10))
49+
.frameDecoder(PayloadDecoder.ZERO_COPY)
50+
.transport(LocalClientTransport.create("localhost"))
4551
.start()
4652
.block();
4753

48-
socket
49-
.requestChannel(
50-
Flux.interval(Duration.ofMillis(1000)).map(i -> DefaultPayload.create("Hello")))
51-
.map(Payload::getDataUtf8)
52-
.doOnNext(System.out::println)
53-
.take(10)
54-
.doFinally(signalType -> socket.dispose())
55-
.then()
56-
.block();
54+
Flux.range(0, 100000000)
55+
.concatMap(i -> socket.fireAndForget(payload1.retain()))
56+
// .doOnNext(p -> {
57+
//// System.out.println(p.getDataUtf8());
58+
// p.release();
59+
// })
60+
.blockLast();
5761
}
5862

5963
private static class SocketAcceptorImpl implements SocketAcceptor {
6064
@Override
6165
public Mono<RSocket> accept(ConnectionSetupPayload setupPayload, RSocket reactiveSocket) {
6266
return Mono.just(
6367
new AbstractRSocket() {
68+
69+
@Override
70+
public Mono<Void> fireAndForget(Payload payload) {
71+
// System.out.println(payload.getDataUtf8());
72+
payload.release();
73+
return Mono.empty();
74+
}
75+
76+
@Override
77+
public Mono<Payload> requestResponse(Payload payload) {
78+
return Mono.just(payload);
79+
}
80+
6481
@Override
6582
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
66-
return Flux.from(payloads)
67-
.map(Payload::getDataUtf8)
68-
.map(s -> "Echo: " + s)
69-
.map(DefaultPayload::create);
83+
return Flux.from(payloads).subscribeOn(Schedulers.single());
7084
}
7185
});
7286
}

0 commit comments

Comments
 (0)