Skip to content

Commit 19e2f76

Browse files
committed
update tcp ping-pong test with resumable transport case
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 16bc03f commit 19e2f76

File tree

8 files changed

+120
-21
lines changed

8 files changed

+120
-21
lines changed

rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private static Mono<CloseableChannel> newServerRSocket() {
226226
private static Mono<CloseableChannel> newServerRSocket(int sessionDurationSeconds) {
227227
return RSocketFactory.receive()
228228
.resume()
229-
.resumeStore(t -> new InMemoryResumableFramesStore("server",100_000))
229+
.resumeStore(t -> new InMemoryResumableFramesStore("server", 100_000))
230230
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
231231
.acceptor((setupPayload, rSocket) -> Mono.just(new TestResponderRSocket()))
232232
.transport(serverTransport(SERVER_HOST, SERVER_PORT))
@@ -239,14 +239,17 @@ private static class TestResponderRSocket extends AbstractRSocket {
239239

240240
@Override
241241
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
242-
return duplicate(Flux.interval(Duration.ofMillis(1))
243-
.onBackpressureLatest().publishOn(Schedulers.elastic()), 20)
242+
return duplicate(
243+
Flux.interval(Duration.ofMillis(1))
244+
.onBackpressureLatest()
245+
.publishOn(Schedulers.elastic()),
246+
20)
244247
.map(v -> DefaultPayload.create(String.valueOf(counter.getAndIncrement())))
245248
.takeUntilOther(Flux.from(payloads).then());
246249
}
247250

248251
private <T> Flux<T> duplicate(Flux<T> f, int n) {
249-
Flux<T> r =Flux.empty();
252+
Flux<T> r = Flux.empty();
250253
for (int i = 0; i < n; i++) {
251254
r = r.mergeWith(f);
252255
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.rsocket.test;
2+
3+
import java.lang.annotation.*;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
6+
7+
/**
8+
* {@code @PerfTest} is used to signal that the annotated test class or method is performance test,
9+
* and is disabled unless enabled via setting the {@code TEST_PERF_ENABLED} environment variable to
10+
* {@code true}.
11+
*/
12+
@Target({ElementType.TYPE, ElementType.METHOD})
13+
@Retention(RetentionPolicy.RUNTIME)
14+
@Documented
15+
@EnabledIfEnvironmentVariable(named = "TEST_PERF_ENABLED", matches = "(?i)true")
16+
@Test
17+
public @interface PerfTest {}

rsocket-test/src/main/java/io/rsocket/test/PingClient.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import io.rsocket.RSocket;
2121
import io.rsocket.util.ByteBufPayload;
2222
import java.time.Duration;
23+
import java.util.function.BiFunction;
2324
import org.HdrHistogram.Recorder;
25+
import org.reactivestreams.Publisher;
2426
import reactor.core.publisher.Flux;
2527
import reactor.core.publisher.Mono;
2628

@@ -49,16 +51,26 @@ public Recorder startTracker(Duration interval) {
4951
return histogram;
5052
}
5153

52-
public Flux<Payload> startPingPong(int count, final Recorder histogram) {
54+
public Flux<Payload> requestResponsePingPong(int count, final Recorder histogram) {
55+
return pingPong(RSocket::requestResponse, count, histogram);
56+
}
57+
58+
public Flux<Payload> requestStreamPingPong(int count, final Recorder histogram) {
59+
return pingPong(RSocket::requestStream, count, histogram);
60+
}
61+
62+
Flux<Payload> pingPong(
63+
BiFunction<RSocket, ? super Payload, ? extends Publisher<Payload>> interaction,
64+
int count,
65+
final Recorder histogram) {
5366
return client
5467
.flatMapMany(
5568
rsocket ->
5669
Flux.range(1, count)
5770
.flatMap(
5871
i -> {
5972
long start = System.nanoTime();
60-
return rsocket
61-
.requestResponse(payload.retain())
73+
return Flux.from(interaction.apply(rsocket, payload.retain()))
6274
.doOnNext(Payload::release)
6375
.doFinally(
6476
signalType -> {

rsocket-test/src/main/java/io/rsocket/test/PingHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.rsocket.SocketAcceptor;
2424
import io.rsocket.util.ByteBufPayload;
2525
import java.util.concurrent.ThreadLocalRandom;
26+
import reactor.core.publisher.Flux;
2627
import reactor.core.publisher.Mono;
2728

2829
public class PingHandler implements SocketAcceptor {
@@ -48,6 +49,12 @@ public Mono<Payload> requestResponse(Payload payload) {
4849
payload.release();
4950
return Mono.just(pong.retain());
5051
}
52+
53+
@Override
54+
public Flux<Payload> requestStream(Payload payload) {
55+
payload.release();
56+
return Flux.range(0, 100).map(v -> pong.retain());
57+
}
5158
});
5259
}
5360
}

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalPingPong.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static void main(String... args) {
4848
int count = 1_000_000_000;
4949

5050
pingClient
51-
.startPingPong(count, recorder)
51+
.requestResponsePingPong(count, recorder)
5252
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
5353
.blockLast();
5454
}

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPing.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,79 @@
1919
import io.rsocket.RSocket;
2020
import io.rsocket.RSocketFactory;
2121
import io.rsocket.frame.decoder.PayloadDecoder;
22+
import io.rsocket.test.PerfTest;
2223
import io.rsocket.test.PingClient;
2324
import io.rsocket.transport.netty.client.TcpClientTransport;
2425
import java.time.Duration;
2526
import org.HdrHistogram.Recorder;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
2629
import reactor.core.publisher.Mono;
2730

31+
@PerfTest
2832
public final class TcpPing {
33+
private static final int INTERACTIONS_COUNT = 1_000_000_000;
34+
private static final int port = Integer.valueOf(System.getProperty("RSOCKET_TEST_PORT", "7878"));
2935

30-
public static void main(String... args) {
31-
Mono<RSocket> client =
32-
RSocketFactory.connect()
33-
.frameDecoder(PayloadDecoder.ZERO_COPY)
34-
.transport(TcpClientTransport.create(7878))
35-
.start();
36+
@BeforeEach
37+
void setUp() {
38+
System.out.println("Starting ping-pong test (TCP transport)");
39+
System.out.println("port: " + port);
40+
}
3641

37-
PingClient pingClient = new PingClient(client);
42+
@Test
43+
void requestResponseTest() {
44+
PingClient pingClient = newPingClient();
45+
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
3846

47+
pingClient
48+
.requestResponsePingPong(INTERACTIONS_COUNT, recorder)
49+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
50+
.blockLast();
51+
}
52+
53+
@Test
54+
void requestStreamTest() {
55+
PingClient pingClient = newPingClient();
3956
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
4057

41-
int count = 1_000_000_000;
58+
pingClient
59+
.requestStreamPingPong(INTERACTIONS_COUNT, recorder)
60+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
61+
.blockLast();
62+
}
63+
64+
@Test
65+
void requestStreamResumableTest() {
66+
PingClient pingClient = newResumablePingClient();
67+
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
4268

4369
pingClient
44-
.startPingPong(count, recorder)
45-
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
70+
.requestStreamPingPong(INTERACTIONS_COUNT, recorder)
71+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
4672
.blockLast();
4773
}
74+
75+
private static PingClient newPingClient() {
76+
return newPingClient(false);
77+
}
78+
79+
private static PingClient newResumablePingClient() {
80+
return newPingClient(true);
81+
}
82+
83+
private static PingClient newPingClient(boolean isResumable) {
84+
RSocketFactory.ClientRSocketFactory clientRSocketFactory = RSocketFactory.connect();
85+
if (isResumable) {
86+
clientRSocketFactory.resume();
87+
}
88+
Mono<RSocket> rSocket =
89+
clientRSocketFactory
90+
.frameDecoder(PayloadDecoder.ZERO_COPY)
91+
.keepAlive(Duration.ofMinutes(1), Duration.ofMinutes(30), 3)
92+
.transport(TcpClientTransport.create(port))
93+
.start();
94+
95+
return new PingClient(rSocket);
96+
}
4897
}

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPongServer.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,23 @@
2222
import io.rsocket.transport.netty.server.TcpServerTransport;
2323

2424
public final class TcpPongServer {
25+
private static final boolean isResume =
26+
Boolean.valueOf(System.getProperty("RSOCKET_TEST_RESUME", "false"));
27+
private static final int port = Integer.valueOf(System.getProperty("RSOCKET_TEST_PORT", "7878"));
2528

2629
public static void main(String... args) {
27-
RSocketFactory.receive()
30+
System.out.println("Starting TCP ping-pong server");
31+
System.out.println("port: " + port);
32+
System.out.println("resume enabled: " + isResume);
33+
34+
RSocketFactory.ServerRSocketFactory serverRSocketFactory = RSocketFactory.receive();
35+
if (isResume) {
36+
serverRSocketFactory.resume();
37+
}
38+
serverRSocketFactory
2839
.frameDecoder(PayloadDecoder.ZERO_COPY)
2940
.acceptor(new PingHandler())
30-
.transport(TcpServerTransport.create(7878))
41+
.transport(TcpServerTransport.create(port))
3142
.start()
3243
.block()
3344
.onClose()

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketPing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static void main(String... args) {
3737
int count = 1_000_000_000;
3838

3939
pingClient
40-
.startPingPong(count, recorder)
40+
.requestResponsePingPong(count, recorder)
4141
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
4242
.blockLast();
4343
}

0 commit comments

Comments
 (0)