Skip to content

Commit 7e7f694

Browse files
committed
Refactored the Netty stuff to a separate util class
1 parent 4201ff1 commit 7e7f694

File tree

2 files changed

+226
-206
lines changed

2 files changed

+226
-206
lines changed
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package com.google.firebase.database.connection;
2+
3+
import static com.google.common.base.Preconditions.checkNotNull;
4+
import static com.google.common.base.Preconditions.checkState;
5+
6+
import com.google.common.util.concurrent.ThreadFactoryBuilder;
7+
import io.netty.bootstrap.Bootstrap;
8+
import io.netty.channel.Channel;
9+
import io.netty.channel.ChannelHandler;
10+
import io.netty.channel.ChannelHandlerContext;
11+
import io.netty.channel.ChannelInitializer;
12+
import io.netty.channel.ChannelPipeline;
13+
import io.netty.channel.EventLoopGroup;
14+
import io.netty.channel.SimpleChannelInboundHandler;
15+
import io.netty.channel.nio.NioEventLoopGroup;
16+
import io.netty.channel.socket.SocketChannel;
17+
import io.netty.channel.socket.nio.NioSocketChannel;
18+
import io.netty.handler.codec.http.DefaultHttpHeaders;
19+
import io.netty.handler.codec.http.FullHttpResponse;
20+
import io.netty.handler.codec.http.HttpClientCodec;
21+
import io.netty.handler.codec.http.HttpObjectAggregator;
22+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
23+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
24+
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
25+
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
26+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
27+
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
28+
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler;
29+
import io.netty.handler.ssl.SslContext;
30+
import io.netty.handler.ssl.SslContextBuilder;
31+
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
32+
import io.netty.util.CharsetUtil;
33+
34+
import java.net.URI;
35+
import java.util.concurrent.ThreadFactory;
36+
import javax.net.ssl.SSLException;
37+
38+
class NettyWebSocketClient implements WebsocketConnection.WSClient {
39+
40+
private final URI uri;
41+
private final SslContext sslContext;
42+
private final ChannelHandler channelHandler;
43+
private final EventLoopGroup group;
44+
45+
private Channel channel;
46+
47+
NettyWebSocketClient(
48+
URI uri, String userAgent,
49+
WebsocketConnection.WSClientEventHandler eventHandler) throws SSLException {
50+
this.uri = checkNotNull(uri);
51+
this.sslContext = SslContextBuilder.forClient()
52+
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();
53+
54+
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(
55+
uri, WebSocketVersion.V13, null, true,
56+
new DefaultHttpHeaders().add("User-Agent", userAgent));
57+
this.channelHandler = new WebSocketClientHandler(eventHandler, handshaker);
58+
59+
ThreadFactory factory = new ThreadFactoryBuilder()
60+
.setNameFormat("hkj-websocket-%d")
61+
.setDaemon(true)
62+
.build();
63+
this.group = new NioEventLoopGroup(1, factory);
64+
}
65+
66+
@Override
67+
public void connect() {
68+
checkState(channel == null, "channel already initialized");
69+
Bootstrap b = new Bootstrap();
70+
b.group(group)
71+
.channel(NioSocketChannel.class)
72+
.handler(new ChannelInitializer<SocketChannel>() {
73+
@Override
74+
protected void initChannel(SocketChannel ch) {
75+
ChannelPipeline p = ch.pipeline();
76+
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), 443));
77+
p.addLast(
78+
new HttpClientCodec(),
79+
new HttpObjectAggregator(8192),
80+
WebSocketClientCompressionHandler.INSTANCE,
81+
channelHandler);
82+
}
83+
});
84+
channel = b.connect(uri.getHost(), 443).channel();
85+
}
86+
87+
@Override
88+
public void close() {
89+
checkState(channel != null, "channel not initialized");
90+
try {
91+
channel.close();
92+
} finally {
93+
group.shutdownGracefully();
94+
// TODO(hkj): https://github.com/netty/netty/issues/7310
95+
}
96+
}
97+
98+
@Override
99+
public void send(String msg) {
100+
checkState(channel != null && channel.isActive(), "channel not connected for sending");
101+
channel.writeAndFlush(new TextWebSocketFrame(msg));
102+
}
103+
104+
private static class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
105+
106+
private final WebsocketConnection.WSClientEventHandler delegate;
107+
private final WebSocketClientHandshaker handshaker;
108+
109+
WebSocketClientHandler(
110+
WebsocketConnection.WSClientEventHandler delegate,
111+
WebSocketClientHandshaker handshaker) {
112+
this.delegate = checkNotNull(delegate);
113+
this.handshaker = checkNotNull(handshaker);
114+
}
115+
116+
@Override
117+
public void handlerAdded(ChannelHandlerContext context) {
118+
// Do nothing
119+
}
120+
121+
@Override
122+
public void channelActive(ChannelHandlerContext context) {
123+
handshaker.handshake(context.channel());
124+
}
125+
126+
@Override
127+
public void channelInactive(ChannelHandlerContext context) {
128+
try {
129+
delegate.onClose();
130+
} finally {
131+
context.close();
132+
}
133+
}
134+
135+
@Override
136+
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
137+
Channel channel = context.channel();
138+
if (!handshaker.isHandshakeComplete()) {
139+
checkState(message instanceof FullHttpResponse);
140+
handshaker.finishHandshake(channel, (FullHttpResponse) message);
141+
delegate.onOpen();
142+
return;
143+
}
144+
145+
if (message instanceof FullHttpResponse) {
146+
FullHttpResponse response = (FullHttpResponse) message;
147+
String error = String.format("Unexpected FullHttpResponse (status: %s; content: %s)",
148+
response.status().toString(), response.content().toString(CharsetUtil.UTF_8));
149+
throw new IllegalStateException(error);
150+
}
151+
152+
WebSocketFrame frame = (WebSocketFrame) message;
153+
if (frame instanceof TextWebSocketFrame) {
154+
delegate.onMessage(((TextWebSocketFrame) frame).text());
155+
} else if (frame instanceof CloseWebSocketFrame) {
156+
try {
157+
delegate.onClose();
158+
} finally {
159+
channel.close();
160+
}
161+
}
162+
}
163+
164+
@Override
165+
public void exceptionCaught(ChannelHandlerContext context, final Throwable cause) {
166+
try {
167+
delegate.onError(cause);
168+
} finally {
169+
context.close();
170+
}
171+
}
172+
}
173+
}

0 commit comments

Comments
 (0)