Skip to content

Commit 8a8d5cc

Browse files
OlegDokukaOlegDokuka
authored andcommitted
wip
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: OlegDokuka <[email protected]>
1 parent cb811cf commit 8a8d5cc

File tree

6 files changed

+133
-10
lines changed

6 files changed

+133
-10
lines changed

build.gradle

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@ subprojects {
3333
apply plugin: 'com.github.sherter.google-java-format'
3434
apply plugin: 'com.github.vlsi.gradle-extensions'
3535

36-
ext['reactor-bom.version'] = '2020.0.32'
36+
ext['reactor-bom.version'] = '2020.0.39'
3737
ext['logback.version'] = '1.2.10'
38-
ext['netty-bom.version'] = '4.1.93.Final'
38+
ext['netty-bom.version'] = '4.1.106.Final'
3939
ext['netty-boringssl.version'] = '2.0.61.Final'
4040
ext['hdrhistogram.version'] = '2.1.12'
4141
ext['mockito.version'] = '4.11.0'

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,8 +463,14 @@ private Mono<Void> acceptSetup(
463463
return interceptors
464464
.initSocketAcceptor(acceptor)
465465
.accept(setupPayload, wrappedRSocketRequester)
466-
.doOnError(
467-
err -> serverSetup.sendError(wrappedDuplexConnection, rejectedSetupError(err)))
466+
.onErrorResume(
467+
err ->
468+
Mono.fromRunnable(
469+
() ->
470+
serverSetup.sendError(
471+
wrappedDuplexConnection, rejectedSetupError(err)))
472+
.then(wrappedDuplexConnection.onClose())
473+
.then(Mono.error(err)))
468474
.doOnNext(
469475
rSocketHandler -> {
470476
RSocket wrappedRSocketHandler = interceptors.initResponder(rSocketHandler);

rsocket-core/src/main/java/io/rsocket/frame/FrameUtil.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,31 @@ public static String toString(ByteBuf frame) {
3333
if (FrameHeaderCodec.hasMetadata(frame)) {
3434
payload.append("\nMetadata:\n");
3535

36-
ByteBufUtil.appendPrettyHexDump(payload, getMetadata(frame, frameType));
36+
ByteBuf metadata = getMetadata(frame, frameType);
37+
if (metadata.readableBytes() < 100) {
38+
ByteBufUtil.appendPrettyHexDump(payload, metadata);
39+
} else {
40+
payload.append(
41+
" +-------------------------------------------------+\n"
42+
+ " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |\n"
43+
+ "+--------+-------------------------------------------------+----------------+\n"
44+
+ "|00000000| too large payload | |\n"
45+
+ "+--------+-------------------------------------------------+----------------+\n");
46+
}
3747
}
3848

3949
payload.append("\nData:\n");
40-
ByteBufUtil.appendPrettyHexDump(payload, getData(frame, frameType));
50+
ByteBuf data = getData(frame, frameType);
51+
if (data.readableBytes() < 100) {
52+
ByteBufUtil.appendPrettyHexDump(payload, data);
53+
} else {
54+
payload.append(
55+
" +-------------------------------------------------+\n"
56+
+ " | 0 1 2 3 4 5 6 7 8 9 a b c d e f |\n"
57+
+ "+--------+-------------------------------------------------+----------------+\n"
58+
+ "|00000000| too large payload | |\n"
59+
+ "+--------+-------------------------------------------------+----------------+\n");
60+
}
4161

4262
return payload.toString();
4363
}

rsocket-core/src/main/java/io/rsocket/resume/ServerRSocketSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ public void setKeepAliveSupport(KeepAliveSupport keepAliveSupport) {
287287

288288
@Override
289289
public void dispose() {
290+
if (logger.isDebugEnabled()) {
291+
logger.debug("Side[server]|Session[{}]. Disposing session", session);
292+
}
290293
Operators.terminate(S, this);
291294
resumableConnection.dispose();
292295
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package io.rsocket.integration;
2+
3+
import io.rsocket.Payload;
4+
import io.rsocket.RSocket;
5+
import io.rsocket.core.RSocketConnector;
6+
import io.rsocket.core.RSocketServer;
7+
import io.rsocket.exceptions.RejectedSetupException;
8+
import io.rsocket.transport.netty.client.TcpClientTransport;
9+
import io.rsocket.transport.netty.server.CloseableChannel;
10+
import io.rsocket.transport.netty.server.TcpServerTransport;
11+
import io.rsocket.util.DefaultPayload;
12+
import org.junit.jupiter.api.Test;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import reactor.core.publisher.Hooks;
16+
import reactor.core.publisher.Mono;
17+
import reactor.netty.tcp.TcpClient;
18+
import reactor.netty.tcp.TcpServer;
19+
import reactor.test.StepVerifier;
20+
21+
public class AuthenticationTest {
22+
23+
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationTest.class);
24+
private static final int PORT = 23200;
25+
26+
@Test
27+
void authTest() {
28+
Hooks.onOperatorDebug();
29+
createServer().block();
30+
RSocket rsocketClient = createClient().block();
31+
32+
StepVerifier.create(rsocketClient.requestResponse(DefaultPayload.create("Client: Hello")))
33+
.expectError(RejectedSetupException.class)
34+
.verify();
35+
}
36+
37+
private static Mono<CloseableChannel> createServer() {
38+
LOG.info("Starting server at port {}", PORT);
39+
RSocketServer rSocketServer =
40+
RSocketServer.create((connectionSetupPayload, rSocket) -> Mono.just(new MyServerRsocket()));
41+
42+
TcpServer tcpServer = TcpServer.create().host("localhost").port(PORT);
43+
44+
return rSocketServer
45+
.interceptors(
46+
interceptorRegistry ->
47+
interceptorRegistry.forSocketAcceptor(
48+
socketAcceptor ->
49+
(setup, sendingSocket) -> {
50+
if (true) { // TODO here would be an authentication check based on the
51+
// setup payload
52+
return Mono.error(new RejectedSetupException("ACCESS_DENIED"));
53+
} else {
54+
return socketAcceptor.accept(setup, sendingSocket);
55+
}
56+
}))
57+
.bind(TcpServerTransport.create(tcpServer))
58+
.doOnNext(closeableChannel -> LOG.info("RSocket server started."));
59+
}
60+
61+
private static Mono<RSocket> createClient() {
62+
LOG.info("Connecting....");
63+
return RSocketConnector.create()
64+
.connect(TcpClientTransport.create(TcpClient.create().host("localhost").port(PORT)))
65+
.doOnNext(rSocket -> LOG.info("Successfully connected to server"))
66+
.doOnError(throwable -> LOG.error("Failed to connect to server"));
67+
}
68+
69+
public static class MyServerRsocket implements RSocket {
70+
private static final Logger LOG = LoggerFactory.getLogger(MyServerRsocket.class);
71+
72+
@Override
73+
public Mono<Payload> requestResponse(Payload payload) {
74+
LOG.info("Got a request with payload: {}", payload.getDataUtf8());
75+
return Mono.just("Response data blah blah blah").map(DefaultPayload::create);
76+
}
77+
}
78+
}

rsocket-transport-netty/src/test/resources/logback-test.xml

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,26 @@
2323
</encoder>
2424
</appender>
2525

26+
<timestamp key="bySecond" datePattern="yyyyMMdd'T'HHmmss"/>
27+
28+
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
29+
<!-- use the previously created timestamp to create a uniquely
30+
named log file -->
31+
<file>log-${bySecond}.txt</file>
32+
<encoder>
33+
<pattern>%date{HH:mm:ss.SSS} %-10thread %-42logger %msg%n</pattern>
34+
</encoder>
35+
</appender>
36+
37+
<appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
38+
<appender-ref ref="FILE" />
39+
</appender>
40+
2641
<logger name="io.rsocket.transport.netty" level="INFO"/>
27-
<logger name="io.rsocket.FrameLogger" level="INFO"/>
42+
<logger name="io.rsocket.FrameLogger" level="DEBUG" additivity="false">
43+
<appender-ref ref="ASYNC" />
44+
</logger>
2845
<logger name="io.rsocket.fragmentation.FragmentationDuplexConnection" level="INFO"/>
29-
<logger name="io.rsocket.transport.netty" level="INFO"/>
30-
<logger name="io.rsocket.FrameLogger" level="INFO"/>
3146
<logger name="io.rsocket.core.RSocketRequester" level="DEBUG"/>
3247
<logger name="io.rsocket.core.RSocketResponder" level="DEBUG"/>
3348
<logger name="io.rsocket.test.TransportTest" level="DEBUG"/>
@@ -36,8 +51,9 @@
3651
<logger name="io.rsocket.resume.ResumableDuplexConnection" level="DEBUG"/>
3752
<logger name="io.rsocket.resume.InMemoryResumableFramesStore" level="DEBUG"/>
3853

39-
<root level="INFO">
54+
<root level="DEBUG">
4055
<appender-ref ref="STDOUT"/>
56+
<appender-ref ref="ASYNC"/>
4157
</root>
4258

4359
</configuration>

0 commit comments

Comments
 (0)