Skip to content

Commit ec8085d

Browse files
committed
added logic to handle different websocket frames
Signed-off-by: Robert Roeser <[email protected]>
1 parent 8178d53 commit ec8085d

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

rsocket-transport-netty/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ if (osdetector.classifier in ["linux-x86_64"] || ["osx-x86_64"] || ["windows-x86
3030
dependencies {
3131
api project(':rsocket-core')
3232
api 'io.projectreactor.netty:reactor-netty'
33+
implementation 'org.slf4j:slf4j-api'
3334

3435
compileOnly 'com.google.code.findbugs:jsr305'
3536

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/server/WebsocketServerTransport.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK;
2020

2121
import io.netty.buffer.ByteBufAllocator;
22+
import io.netty.buffer.Unpooled;
23+
import io.netty.channel.ChannelHandlerContext;
24+
import io.netty.channel.ChannelInboundHandlerAdapter;
25+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
26+
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
27+
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
2228
import io.rsocket.DuplexConnection;
2329
import io.rsocket.fragmentation.FragmentationDuplexConnection;
2430
import io.rsocket.transport.ClientTransport;
@@ -30,6 +36,8 @@
3036
import java.util.Map;
3137
import java.util.Objects;
3238
import java.util.function.Supplier;
39+
import org.slf4j.Logger;
40+
import org.slf4j.LoggerFactory;
3341
import reactor.core.publisher.Mono;
3442
import reactor.netty.Connection;
3543
import reactor.netty.http.server.HttpServer;
@@ -40,6 +48,7 @@
4048
*/
4149
public final class WebsocketServerTransport
4250
implements ServerTransport<CloseableChannel>, TransportHeaderAware {
51+
private static final Logger logger = LoggerFactory.getLogger(WebsocketServerTransport.class);
4352

4453
private final HttpServer server;
4554

@@ -95,10 +104,36 @@ public static WebsocketServerTransport create(InetSocketAddress address) {
95104
* @return a new instance
96105
* @throws NullPointerException if {@code server} is {@code null}
97106
*/
98-
public static WebsocketServerTransport create(HttpServer server) {
107+
public static WebsocketServerTransport create(final HttpServer server) {
99108
Objects.requireNonNull(server, "server must not be null");
100109

101-
return new WebsocketServerTransport(server);
110+
return new WebsocketServerTransport(
111+
server.tcpConfiguration(
112+
tcpServer ->
113+
tcpServer.doOnConnection(
114+
connection ->
115+
connection.addHandlerLast(
116+
new ChannelInboundHandlerAdapter() {
117+
@Override
118+
public void channelRead(ChannelHandlerContext ctx, Object msg)
119+
throws Exception {
120+
if (msg instanceof PongWebSocketFrame) {
121+
logger.debug("received WebSocket Pong Frame");
122+
} else if (msg instanceof PingWebSocketFrame) {
123+
logger.debug(
124+
"received WebSocket Ping Frame - sending Pong Frame");
125+
PongWebSocketFrame pongWebSocketFrame =
126+
new PongWebSocketFrame(Unpooled.EMPTY_BUFFER);
127+
ctx.writeAndFlush(pongWebSocketFrame);
128+
} else if (msg instanceof CloseWebSocketFrame) {
129+
logger.warn(
130+
"received WebSocket Close Frame - connection is closing");
131+
ctx.close();
132+
} else {
133+
ctx.fireChannelRead(msg);
134+
}
135+
}
136+
}))));
102137
}
103138

104139
@Override

0 commit comments

Comments
 (0)