Skip to content

Commit 0726525

Browse files
authored
polishes code (#967)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 412edc4 commit 0726525

File tree

4 files changed

+17
-17
lines changed

4 files changed

+17
-17
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -385,13 +385,14 @@ private Mono<Void> acceptSetup(
385385
clientServerConnection,
386386
new InvalidSetupException(
387387
"Unsupported version: " + SetupFrameCodec.humanReadableVersion(setupFrame)));
388+
return clientServerConnection.onClose();
388389
}
389390

390391
boolean leaseEnabled = leasesSupplier != null;
391392
if (SetupFrameCodec.honorLease(setupFrame) && !leaseEnabled) {
392393
serverSetup.sendError(
393394
clientServerConnection, new InvalidSetupException("lease is not supported"));
394-
return Mono.empty();
395+
return clientServerConnection.onClose();
395396
}
396397

397398
return serverSetup.acceptRSocketSetup(

rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public Mono<Void> acceptRSocketSetup(
6565

6666
if (SetupFrameCodec.resumeEnabled(frame)) {
6767
sendError(duplexConnection, new UnsupportedSetupException("resume not supported"));
68-
return Mono.empty();
68+
return duplexConnection.onClose();
6969
} else {
7070
return then.apply(new DefaultKeepAliveHandler(duplexConnection), duplexConnection);
7171
}

rsocket-examples/src/main/java/io/rsocket/examples/transport/tcp/stream/ClientStreamingToServer.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616

1717
package io.rsocket.examples.transport.tcp.stream;
1818

19-
import io.rsocket.Payload;
2019
import io.rsocket.RSocket;
2120
import io.rsocket.SocketAcceptor;
2221
import io.rsocket.core.RSocketConnector;
2322
import io.rsocket.core.RSocketServer;
2423
import io.rsocket.transport.netty.client.TcpClientTransport;
25-
import io.rsocket.transport.netty.server.TcpServerTransport;
24+
import io.rsocket.transport.netty.server.WebsocketServerTransport;
2625
import io.rsocket.util.DefaultPayload;
2726
import java.time.Duration;
2827
import org.slf4j.Logger;
@@ -33,26 +32,28 @@ public final class ClientStreamingToServer {
3332

3433
private static final Logger logger = LoggerFactory.getLogger(ClientStreamingToServer.class);
3534

36-
public static void main(String[] args) {
35+
public static void main(String[] args) throws InterruptedException {
3736
RSocketServer.create(
3837
SocketAcceptor.forRequestStream(
3938
payload ->
4039
Flux.interval(Duration.ofMillis(100))
4140
.map(aLong -> DefaultPayload.create("Interval: " + aLong))))
42-
.bind(TcpServerTransport.create("localhost", 7000))
41+
.bind(WebsocketServerTransport.create("localhost", 7000))
4342
.subscribe();
4443

4544
RSocket socket =
4645
RSocketConnector.connectWith(TcpClientTransport.create("localhost", 7000)).block();
4746

48-
socket
49-
.requestStream(DefaultPayload.create("Hello"))
50-
.map(Payload::getDataUtf8)
51-
.doOnNext(logger::debug)
52-
.take(10)
53-
.then()
54-
.doFinally(signalType -> socket.dispose())
55-
.then()
56-
.block();
47+
// socket
48+
// .requestStream(DefaultPayload.create("Hello"))
49+
// .map(Payload::getDataUtf8)
50+
// .doOnNext(logger::debug)
51+
// .take(10)
52+
// .then()
53+
// .doFinally(signalType -> socket.dispose())
54+
// .then()
55+
// .block();
56+
57+
Thread.sleep(1000000);
5758
}
5859
}

rsocket-test/src/main/java/io/rsocket/test/LeaksTrackingByteBufAllocator.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
8080
}
8181

8282
if (!hasUnreleased) {
83-
System.out.println(tag + " all the buffers are released...");
8483
return this;
8584
}
8685

@@ -109,7 +108,6 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
109108
return checkResult;
110109
},
111110
tag);
112-
System.out.println(tag + " all the buffers are released...");
113111
} finally {
114112
tracker.clear();
115113
}

0 commit comments

Comments
 (0)