Skip to content

Commit eecbd6d

Browse files
committed
cleanups examples
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8c928ec commit eecbd6d

File tree

7 files changed

+24
-26
lines changed

7 files changed

+24
-26
lines changed

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/channel/ChannelEchoClient.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ public static void main(String[] args) {
4343
.map(s -> "Echo: " + s)
4444
.map(DefaultPayload::create));
4545

46-
RSocketServer.create(echoAcceptor)
47-
.bind(TcpServerTransport.create("localhost", 7000))
48-
.subscribe();
46+
RSocketServer.create(echoAcceptor).bindNow(TcpServerTransport.create("localhost", 7000));
4947

5048
RSocket socket =
5149
RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000)).block();

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/client/RSocketClientExample.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public static void main(String[] args) {
3434
.bind(TcpServerTransport.create("localhost", 7000))
3535
.delaySubscription(Duration.ofSeconds(5))
3636
.doOnNext(cc -> logger.info("Server started on the address : {}", cc.address()))
37-
.subscribe();
37+
.block();
3838

3939
Mono<RSocket> source =
4040
RSocketConnector.create()

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/plugins/LimitRateInterceptorExample.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
3939
}
4040
}))
4141
.interceptors(registry -> registry.forResponder(LimitRateInterceptor.forResponder(64)))
42-
.bind(TcpServerTransport.create("localhost", 7000))
43-
.subscribe();
42+
.bindNow(TcpServerTransport.create("localhost", 7000));
4443

4544
RSocket socket =
4645
RSocketConnector.create()

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/requestresponse/HelloWorldClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ public Mono<Payload> requestResponse(Payload p) {
5050
};
5151

5252
RSocketServer.create(SocketAcceptor.with(rsocket))
53-
.bind(TcpServerTransport.create("localhost", 7000))
54-
.subscribe();
53+
.bindNow(TcpServerTransport.create("localhost", 7000));
5554

5655
RSocket socket =
5756
RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000)).block();

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/resume/ResumeFileTransfer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,7 @@ public static void main(String[] args) {
6666
.log("server");
6767
}))
6868
.resume(resume)
69-
.bind(TcpServerTransport.create("localhost", 8000))
70-
.block();
69+
.bindNow(TcpServerTransport.create("localhost", 8000));
7170

7271
RSocket client =
7372
RSocketConnector.create()

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/ClientStreamingToServer.java

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
package io.rsocket.examples.transport.tcp.stream;
1818

19+
import io.rsocket.Payload;
1920
import io.rsocket.RSocket;
2021
import io.rsocket.SocketAcceptor;
2122
import io.rsocket.core.RSocketConnector;
2223
import io.rsocket.core.RSocketServer;
2324
import io.rsocket.transport.netty.client.TcpClientTransport;
24-
import io.rsocket.transport.netty.server.WebsocketServerTransport;
25+
import io.rsocket.transport.netty.server.TcpServerTransport;
2526
import io.rsocket.util.DefaultPayload;
2627
import java.time.Duration;
2728
import org.slf4j.Logger;
@@ -38,21 +39,24 @@ public static void main(String[] args) throws InterruptedException {
3839
payload ->
3940
Flux.interval(Duration.ofMillis(100))
4041
.map(aLong -> DefaultPayload.create("Interval: " + aLong))))
41-
.bind(WebsocketServerTransport.create("localhost", 7000))
42-
.subscribe();
42+
.bindNow(TcpServerTransport.create("localhost", 7000));
4343

4444
RSocket socket =
45-
RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000)).block();
46-
47-
// socket
48-
// .requestStream(DefaultPayload.create("Hello"))
49-
// .map(Payload::getDataUtf8)
50-
// .doOnNext(logger::debug)
51-
// .take(10)
52-
// .then()
53-
// .doFinally(signalType -> socket.dispose())
54-
// .then()
55-
// .block();
45+
RSocketConnector.create()
46+
.setupPayload(DefaultPayload.create("test", "test"))
47+
.connect(TcpClientTransport.create("localhost", 7000))
48+
.block();
49+
50+
final Payload payload = DefaultPayload.create("Hello");
51+
socket
52+
.requestStream(payload)
53+
.map(Payload::getDataUtf8)
54+
.doOnNext(logger::debug)
55+
.take(10)
56+
.then()
57+
.doFinally(signalType -> socket.dispose())
58+
.then()
59+
.block();
5660

5761
Thread.sleep(1000000);
5862
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/ServerStreamingToClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ public static void main(String[] args) {
4343

4444
return Mono.just(new RSocket() {});
4545
})
46-
.bind(TcpServerTransport.create("localhost", 7000))
47-
.subscribe();
46+
.bindNow(TcpServerTransport.create("localhost", 7000));
4847

4948
RSocket rsocket =
5049
RSocketConnector.create()

0 commit comments

Comments
 (0)