Skip to content

Commit 148e6ea

Browse files
Merge pull request #606 from rsocket/resume-tests
update tcp performance test with resume case
2 parents f55cf8f + 19e2f76 commit 148e6ea

File tree

8 files changed

+131
-21
lines changed

8 files changed

+131
-21
lines changed

rsocket-examples/src/test/java/io/rsocket/resume/ResumeIntegrationTest.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import reactor.core.publisher.Flux;
4141
import reactor.core.publisher.Mono;
4242
import reactor.core.publisher.ReplayProcessor;
43+
import reactor.core.scheduler.Schedulers;
4344
import reactor.test.StepVerifier;
4445

4546
@SlowTest
@@ -83,7 +84,7 @@ public void reconnectOnDisconnect() {
8384
StepVerifier.create(
8485
rSocket
8586
.requestChannel(testRequest())
86-
.take(Duration.ofSeconds(120))
87+
.take(Duration.ofSeconds(600))
8788
.map(Payload::getDataUtf8)
8889
.timeout(Duration.ofSeconds(12))
8990
.doOnNext(x -> throwOnNonContinuous(counter, x))
@@ -210,7 +211,8 @@ private static Mono<RSocket> newClientRSocket(
210211
return RSocketFactory.connect()
211212
.resume()
212213
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
213-
.keepAliveTickPeriod(Duration.ofSeconds(1))
214+
.keepAliveTickPeriod(Duration.ofSeconds(30))
215+
.keepAliveAckTimeout(Duration.ofMinutes(5))
214216
.errorConsumer(errConsumer)
215217
.resumeStrategy(() -> new PeriodicResumeStrategy(Duration.ofSeconds(1)))
216218
.transport(clientTransport)
@@ -224,6 +226,7 @@ private static Mono<CloseableChannel> newServerRSocket() {
224226
private static Mono<CloseableChannel> newServerRSocket(int sessionDurationSeconds) {
225227
return RSocketFactory.receive()
226228
.resume()
229+
.resumeStore(t -> new InMemoryResumableFramesStore("server", 100_000))
227230
.resumeSessionDuration(Duration.ofSeconds(sessionDurationSeconds))
228231
.acceptor((setupPayload, rSocket) -> Mono.just(new TestResponderRSocket()))
229232
.transport(serverTransport(SERVER_HOST, SERVER_PORT))
@@ -236,10 +239,21 @@ private static class TestResponderRSocket extends AbstractRSocket {
236239

237240
@Override
238241
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
239-
return Flux.interval(Duration.ofMillis(1))
240-
.onBackpressureLatest()
242+
return duplicate(
243+
Flux.interval(Duration.ofMillis(1))
244+
.onBackpressureLatest()
245+
.publishOn(Schedulers.elastic()),
246+
20)
241247
.map(v -> DefaultPayload.create(String.valueOf(counter.getAndIncrement())))
242248
.takeUntilOther(Flux.from(payloads).then());
243249
}
250+
251+
private <T> Flux<T> duplicate(Flux<T> f, int n) {
252+
Flux<T> r = Flux.empty();
253+
for (int i = 0; i < n; i++) {
254+
r = r.mergeWith(f);
255+
}
256+
return r;
257+
}
244258
}
245259
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.rsocket.test;
2+
3+
import java.lang.annotation.*;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
6+
7+
/**
8+
* {@code @PerfTest} is used to signal that the annotated test class or method is performance test,
9+
* and is disabled unless enabled via setting the {@code TEST_PERF_ENABLED} environment variable to
10+
* {@code true}.
11+
*/
12+
@Target({ElementType.TYPE, ElementType.METHOD})
13+
@Retention(RetentionPolicy.RUNTIME)
14+
@Documented
15+
@EnabledIfEnvironmentVariable(named = "TEST_PERF_ENABLED", matches = "(?i)true")
16+
@Test
17+
public @interface PerfTest {}

rsocket-test/src/main/java/io/rsocket/test/PingClient.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
import io.rsocket.RSocket;
2121
import io.rsocket.util.ByteBufPayload;
2222
import java.time.Duration;
23+
import java.util.function.BiFunction;
2324
import org.HdrHistogram.Recorder;
25+
import org.reactivestreams.Publisher;
2426
import reactor.core.publisher.Flux;
2527
import reactor.core.publisher.Mono;
2628

@@ -49,16 +51,26 @@ public Recorder startTracker(Duration interval) {
4951
return histogram;
5052
}
5153

52-
public Flux<Payload> startPingPong(int count, final Recorder histogram) {
54+
public Flux<Payload> requestResponsePingPong(int count, final Recorder histogram) {
55+
return pingPong(RSocket::requestResponse, count, histogram);
56+
}
57+
58+
public Flux<Payload> requestStreamPingPong(int count, final Recorder histogram) {
59+
return pingPong(RSocket::requestStream, count, histogram);
60+
}
61+
62+
Flux<Payload> pingPong(
63+
BiFunction<RSocket, ? super Payload, ? extends Publisher<Payload>> interaction,
64+
int count,
65+
final Recorder histogram) {
5366
return client
5467
.flatMapMany(
5568
rsocket ->
5669
Flux.range(1, count)
5770
.flatMap(
5871
i -> {
5972
long start = System.nanoTime();
60-
return rsocket
61-
.requestResponse(payload.retain())
73+
return Flux.from(interaction.apply(rsocket, payload.retain()))
6274
.doOnNext(Payload::release)
6375
.doFinally(
6476
signalType -> {

rsocket-test/src/main/java/io/rsocket/test/PingHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.rsocket.SocketAcceptor;
2424
import io.rsocket.util.ByteBufPayload;
2525
import java.util.concurrent.ThreadLocalRandom;
26+
import reactor.core.publisher.Flux;
2627
import reactor.core.publisher.Mono;
2728

2829
public class PingHandler implements SocketAcceptor {
@@ -48,6 +49,12 @@ public Mono<Payload> requestResponse(Payload payload) {
4849
payload.release();
4950
return Mono.just(pong.retain());
5051
}
52+
53+
@Override
54+
public Flux<Payload> requestStream(Payload payload) {
55+
payload.release();
56+
return Flux.range(0, 100).map(v -> pong.retain());
57+
}
5158
});
5259
}
5360
}

rsocket-transport-local/src/test/java/io/rsocket/transport/local/LocalPingPong.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public static void main(String... args) {
4848
int count = 1_000_000_000;
4949

5050
pingClient
51-
.startPingPong(count, recorder)
51+
.requestResponsePingPong(count, recorder)
5252
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
5353
.blockLast();
5454
}

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPing.java

Lines changed: 59 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,30 +19,79 @@
1919
import io.rsocket.RSocket;
2020
import io.rsocket.RSocketFactory;
2121
import io.rsocket.frame.decoder.PayloadDecoder;
22+
import io.rsocket.test.PerfTest;
2223
import io.rsocket.test.PingClient;
2324
import io.rsocket.transport.netty.client.TcpClientTransport;
2425
import java.time.Duration;
2526
import org.HdrHistogram.Recorder;
27+
import org.junit.jupiter.api.BeforeEach;
28+
import org.junit.jupiter.api.Test;
2629
import reactor.core.publisher.Mono;
2730

31+
@PerfTest
2832
public final class TcpPing {
33+
private static final int INTERACTIONS_COUNT = 1_000_000_000;
34+
private static final int port = Integer.valueOf(System.getProperty("RSOCKET_TEST_PORT", "7878"));
2935

30-
public static void main(String... args) {
31-
Mono<RSocket> client =
32-
RSocketFactory.connect()
33-
.frameDecoder(PayloadDecoder.ZERO_COPY)
34-
.transport(TcpClientTransport.create(7878))
35-
.start();
36+
@BeforeEach
37+
void setUp() {
38+
System.out.println("Starting ping-pong test (TCP transport)");
39+
System.out.println("port: " + port);
40+
}
3641

37-
PingClient pingClient = new PingClient(client);
42+
@Test
43+
void requestResponseTest() {
44+
PingClient pingClient = newPingClient();
45+
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
3846

47+
pingClient
48+
.requestResponsePingPong(INTERACTIONS_COUNT, recorder)
49+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
50+
.blockLast();
51+
}
52+
53+
@Test
54+
void requestStreamTest() {
55+
PingClient pingClient = newPingClient();
3956
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
4057

41-
int count = 1_000_000_000;
58+
pingClient
59+
.requestStreamPingPong(INTERACTIONS_COUNT, recorder)
60+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
61+
.blockLast();
62+
}
63+
64+
@Test
65+
void requestStreamResumableTest() {
66+
PingClient pingClient = newResumablePingClient();
67+
Recorder recorder = pingClient.startTracker(Duration.ofSeconds(1));
4268

4369
pingClient
44-
.startPingPong(count, recorder)
45-
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
70+
.requestStreamPingPong(INTERACTIONS_COUNT, recorder)
71+
.doOnTerminate(() -> System.out.println("Sent " + INTERACTIONS_COUNT + " messages."))
4672
.blockLast();
4773
}
74+
75+
private static PingClient newPingClient() {
76+
return newPingClient(false);
77+
}
78+
79+
private static PingClient newResumablePingClient() {
80+
return newPingClient(true);
81+
}
82+
83+
private static PingClient newPingClient(boolean isResumable) {
84+
RSocketFactory.ClientRSocketFactory clientRSocketFactory = RSocketFactory.connect();
85+
if (isResumable) {
86+
clientRSocketFactory.resume();
87+
}
88+
Mono<RSocket> rSocket =
89+
clientRSocketFactory
90+
.frameDecoder(PayloadDecoder.ZERO_COPY)
91+
.keepAlive(Duration.ofMinutes(1), Duration.ofMinutes(30), 3)
92+
.transport(TcpClientTransport.create(port))
93+
.start();
94+
95+
return new PingClient(rSocket);
96+
}
4897
}

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/TcpPongServer.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,23 @@
2222
import io.rsocket.transport.netty.server.TcpServerTransport;
2323

2424
public final class TcpPongServer {
25+
private static final boolean isResume =
26+
Boolean.valueOf(System.getProperty("RSOCKET_TEST_RESUME", "false"));
27+
private static final int port = Integer.valueOf(System.getProperty("RSOCKET_TEST_PORT", "7878"));
2528

2629
public static void main(String... args) {
27-
RSocketFactory.receive()
30+
System.out.println("Starting TCP ping-pong server");
31+
System.out.println("port: " + port);
32+
System.out.println("resume enabled: " + isResume);
33+
34+
RSocketFactory.ServerRSocketFactory serverRSocketFactory = RSocketFactory.receive();
35+
if (isResume) {
36+
serverRSocketFactory.resume();
37+
}
38+
serverRSocketFactory
2839
.frameDecoder(PayloadDecoder.ZERO_COPY)
2940
.acceptor(new PingHandler())
30-
.transport(TcpServerTransport.create(7878))
41+
.transport(TcpServerTransport.create(port))
3142
.start()
3243
.block()
3344
.onClose()

rsocket-transport-netty/src/test/java/io/rsocket/transport/netty/WebsocketPing.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public static void main(String... args) {
3737
int count = 1_000_000_000;
3838

3939
pingClient
40-
.startPingPong(count, recorder)
40+
.requestResponsePingPong(count, recorder)
4141
.doOnTerminate(() -> System.out.println("Sent " + count + " messages."))
4242
.blockLast();
4343
}

0 commit comments

Comments
 (0)