|
19 | 19 | import static io.rsocket.frame.FrameLengthFlyweight.FRAME_LENGTH_MASK;
|
20 | 20 |
|
21 | 21 | import io.netty.buffer.ByteBufAllocator;
|
| 22 | +import io.netty.buffer.Unpooled; |
| 23 | +import io.netty.channel.ChannelHandlerContext; |
| 24 | +import io.netty.channel.ChannelInboundHandlerAdapter; |
| 25 | +import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; |
| 26 | +import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; |
| 27 | +import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; |
22 | 28 | import io.rsocket.DuplexConnection;
|
23 | 29 | import io.rsocket.fragmentation.FragmentationDuplexConnection;
|
24 | 30 | import io.rsocket.transport.ClientTransport;
|
|
30 | 36 | import java.util.Map;
|
31 | 37 | import java.util.Objects;
|
32 | 38 | import java.util.function.Supplier;
|
| 39 | +import org.slf4j.Logger; |
| 40 | +import org.slf4j.LoggerFactory; |
33 | 41 | import reactor.core.publisher.Mono;
|
34 | 42 | import reactor.netty.Connection;
|
35 | 43 | import reactor.netty.http.server.HttpServer;
|
|
40 | 48 | */
|
41 | 49 | public final class WebsocketServerTransport
|
42 | 50 | implements ServerTransport<CloseableChannel>, TransportHeaderAware {
|
| 51 | + private static final Logger logger = LoggerFactory.getLogger(WebsocketServerTransport.class); |
43 | 52 |
|
44 | 53 | private final HttpServer server;
|
45 | 54 |
|
@@ -95,10 +104,36 @@ public static WebsocketServerTransport create(InetSocketAddress address) {
|
95 | 104 | * @return a new instance
|
96 | 105 | * @throws NullPointerException if {@code server} is {@code null}
|
97 | 106 | */
|
98 |
| - public static WebsocketServerTransport create(HttpServer server) { |
| 107 | + public static WebsocketServerTransport create(final HttpServer server) { |
99 | 108 | Objects.requireNonNull(server, "server must not be null");
|
100 | 109 |
|
101 |
| - return new WebsocketServerTransport(server); |
| 110 | + return new WebsocketServerTransport( |
| 111 | + server.tcpConfiguration( |
| 112 | + tcpServer -> |
| 113 | + tcpServer.doOnConnection( |
| 114 | + connection -> |
| 115 | + connection.addHandlerLast( |
| 116 | + new ChannelInboundHandlerAdapter() { |
| 117 | + @Override |
| 118 | + public void channelRead(ChannelHandlerContext ctx, Object msg) |
| 119 | + throws Exception { |
| 120 | + if (msg instanceof PongWebSocketFrame) { |
| 121 | + logger.debug("received WebSocket Pong Frame"); |
| 122 | + } else if (msg instanceof PingWebSocketFrame) { |
| 123 | + logger.debug( |
| 124 | + "received WebSocket Ping Frame - sending Pong Frame"); |
| 125 | + PongWebSocketFrame pongWebSocketFrame = |
| 126 | + new PongWebSocketFrame(Unpooled.EMPTY_BUFFER); |
| 127 | + ctx.writeAndFlush(pongWebSocketFrame); |
| 128 | + } else if (msg instanceof CloseWebSocketFrame) { |
| 129 | + logger.warn( |
| 130 | + "received WebSocket Close Frame - connection is closing"); |
| 131 | + ctx.close(); |
| 132 | + } else { |
| 133 | + ctx.fireChannelRead(msg); |
| 134 | + } |
| 135 | + } |
| 136 | + })))); |
102 | 137 | }
|
103 | 138 |
|
104 | 139 | @Override
|
|
0 commit comments