Skip to content

Commit 932a753

Browse files
committed
channel load test
Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent fb10188 commit 932a753

File tree

1 file changed

+93
-0
lines changed

1 file changed

+93
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.rsocket.transport.netty;
2+
3+
import io.rsocket.AbstractRSocket;
4+
import io.rsocket.Payload;
5+
import io.rsocket.RSocket;
6+
import io.rsocket.RSocketFactory;
7+
import io.rsocket.resume.InMemoryResumableFramesStore;
8+
import io.rsocket.resume.PeriodicResumeStrategy;
9+
import io.rsocket.transport.ClientTransport;
10+
import io.rsocket.transport.netty.client.TcpClientTransport;
11+
import io.rsocket.transport.netty.server.CloseableChannel;
12+
import io.rsocket.transport.netty.server.TcpServerTransport;
13+
import io.rsocket.util.DefaultPayload;
14+
import org.junit.jupiter.api.Test;
15+
import org.reactivestreams.Publisher;
16+
import reactor.core.publisher.Flux;
17+
import reactor.core.publisher.Mono;
18+
import reactor.core.scheduler.Schedulers;
19+
import reactor.test.StepVerifier;
20+
21+
import java.net.InetSocketAddress;
22+
import java.time.Duration;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.Consumer;
25+
26+
public class ChannelLoadTest {
27+
28+
@Test
29+
void channel() {
30+
CloseableChannel closeableChannel = newServerRSocket().block();
31+
RSocket rSocket = newClientRSocket(closeableChannel.address(), err -> {
32+
throw new RuntimeException(err);
33+
}).block();
34+
35+
StepVerifier.create(
36+
rSocket
37+
.requestChannel(testRequest())
38+
.take(Duration.ofSeconds(600))
39+
.map(Payload::getDataUtf8)
40+
.timeout(Duration.ofSeconds(5))
41+
.then()
42+
.doFinally(s -> closeableChannel.dispose()))
43+
.expectComplete()
44+
.verify();
45+
}
46+
47+
private static Flux<Payload> testRequest() {
48+
return Flux.interval(Duration.ofMillis(50))
49+
.map(v -> DefaultPayload.create("client_request"))
50+
.onBackpressureDrop();
51+
}
52+
53+
private static Mono<RSocket> newClientRSocket(InetSocketAddress address,
54+
Consumer<Throwable> errConsumer) {
55+
return RSocketFactory.connect()
56+
.keepAliveTickPeriod(Duration.ofSeconds(30))
57+
.keepAliveAckTimeout(Duration.ofMinutes(5))
58+
.errorConsumer(errConsumer)
59+
.transport(TcpClientTransport.create(address))
60+
.start();
61+
}
62+
63+
private static Mono<CloseableChannel> newServerRSocket() {
64+
return RSocketFactory.receive()
65+
.acceptor((setupPayload, rSocket) -> Mono.just(new TestResponderRSocket()))
66+
.transport(TcpServerTransport.create("localhost", 0))
67+
.start();
68+
}
69+
70+
private static class TestResponderRSocket extends AbstractRSocket {
71+
72+
AtomicInteger counter = new AtomicInteger();
73+
74+
@Override
75+
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
76+
return duplicate(
77+
Flux.interval(Duration.ofMillis(1))
78+
.onBackpressureLatest()
79+
.publishOn(Schedulers.elastic()),
80+
40)
81+
.map(v -> DefaultPayload.create(String.valueOf(counter.getAndIncrement())))
82+
.takeUntilOther(Flux.from(payloads).then());
83+
}
84+
85+
private <T> Flux<T> duplicate(Flux<T> f, int n) {
86+
Flux<T> r = Flux.empty();
87+
for (int i = 0; i < n; i++) {
88+
r = r.mergeWith(f);
89+
}
90+
return r;
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)