Skip to content

Commit 7187f05

Browse files
authored
WebSocket Implementation Based on Netty (#91)
* Prototyping Firestore integration for Java * Some code cleanup * Updating comments * Code cleanup * Removed getInstance() methods from FirestoreClient and added getFirestore() methods. * Minor cleanup * Adding Firestore OAuth scopes * Some documentation updates * Reverted a doc change * Updating GCS dependency version (#89) * Netty-based websocket impl * Refactored the Netty stuff to a separate util class * More code clean up and removing tubesock package * Using the SDK ThreadManager for DB websocket threads * More code cleanup and error checks * Handling connection drop events * Handling connection loss events * Adding documentation * Removed string list reader * Removing redundant whitespace * Revert "Using the SDK ThreadManager for DB websocket threads" This reverts commit 8e90610. * src/main/java/com/google/firebase/database/connection/util/StringListReader.java * Added StringListReader back * Re-updated websocket client * reverting more syntactic changes * Adding new line * Added documentation; Using a more secure trust manager; Other cleanup * Removed unused import * Using the port on URI when available * Removing websocket commpression handler * Declaring Netty dependencies explicitly (using the same version as gRPC)
1 parent 3283aa8 commit 7187f05

25 files changed

+447
-2019
lines changed

pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6060
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
6161
<skipUTs>${skipTests}</skipUTs>
62+
<netty.version>4.1.14.Final</netty.version>
6263
</properties>
6364

6465
<scm>
@@ -429,6 +430,21 @@
429430
<artifactId>slf4j-api</artifactId>
430431
<version>1.7.25</version>
431432
</dependency>
433+
<dependency>
434+
<groupId>io.netty</groupId>
435+
<artifactId>netty-codec-http</artifactId>
436+
<version>${netty.version}</version>
437+
</dependency>
438+
<dependency>
439+
<groupId>io.netty</groupId>
440+
<artifactId>netty-handler</artifactId>
441+
<version>${netty.version}</version>
442+
</dependency>
443+
<dependency>
444+
<groupId>io.netty</groupId>
445+
<artifactId>netty-transport</artifactId>
446+
<version>${netty.version}</version>
447+
</dependency>
432448

433449
<!-- Test Dependencies -->
434450
<dependency>

src/main/java/com/google/firebase/database/connection/ConnectionContext.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
package com.google.firebase.database.connection;
1818

1919
import com.google.firebase.database.logging.Logger;
20-
import com.google.firebase.database.tubesock.ThreadConfig;
2120

2221
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.ThreadFactory;
2323

2424
public class ConnectionContext {
2525

@@ -29,7 +29,7 @@ public class ConnectionContext {
2929
private final boolean persistenceEnabled;
3030
private final String clientSdkVersion;
3131
private final String userAgent;
32-
private final ThreadConfig threadConfig;
32+
private final ThreadFactory threadFactory;
3333

3434
public ConnectionContext(
3535
Logger logger,
@@ -38,14 +38,14 @@ public ConnectionContext(
3838
boolean persistenceEnabled,
3939
String clientSdkVersion,
4040
String userAgent,
41-
ThreadConfig threadConfig) {
41+
ThreadFactory threadFactory) {
4242
this.logger = logger;
4343
this.authTokenProvider = authTokenProvider;
4444
this.executorService = executorService;
4545
this.persistenceEnabled = persistenceEnabled;
4646
this.clientSdkVersion = clientSdkVersion;
4747
this.userAgent = userAgent;
48-
this.threadConfig = threadConfig;
48+
this.threadFactory = threadFactory;
4949
}
5050

5151
public Logger getLogger() {
@@ -72,7 +72,7 @@ public String getUserAgent() {
7272
return this.userAgent;
7373
}
7474

75-
public ThreadConfig getThreadConfig() {
76-
return threadConfig;
75+
public ThreadFactory getThreadFactory() {
76+
return threadFactory;
7777
}
7878
}
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
package com.google.firebase.database.connection;
2+
3+
import static com.google.common.base.Preconditions.checkArgument;
4+
import static com.google.common.base.Preconditions.checkNotNull;
5+
import static com.google.common.base.Preconditions.checkState;
6+
7+
import com.google.common.base.Strings;
8+
import com.google.firebase.internal.GaeThreadFactory;
9+
import com.google.firebase.internal.RevivingScheduledExecutor;
10+
import io.netty.bootstrap.Bootstrap;
11+
import io.netty.channel.Channel;
12+
import io.netty.channel.ChannelFuture;
13+
import io.netty.channel.ChannelFutureListener;
14+
import io.netty.channel.ChannelHandler;
15+
import io.netty.channel.ChannelHandlerContext;
16+
import io.netty.channel.ChannelInitializer;
17+
import io.netty.channel.ChannelPipeline;
18+
import io.netty.channel.EventLoopGroup;
19+
import io.netty.channel.SimpleChannelInboundHandler;
20+
import io.netty.channel.nio.NioEventLoopGroup;
21+
import io.netty.channel.socket.SocketChannel;
22+
import io.netty.channel.socket.nio.NioSocketChannel;
23+
import io.netty.handler.codec.http.DefaultHttpHeaders;
24+
import io.netty.handler.codec.http.FullHttpResponse;
25+
import io.netty.handler.codec.http.HttpClientCodec;
26+
import io.netty.handler.codec.http.HttpObjectAggregator;
27+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
28+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
29+
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
30+
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
31+
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
32+
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
33+
import io.netty.handler.ssl.SslContext;
34+
import io.netty.handler.ssl.SslContextBuilder;
35+
36+
import java.net.URI;
37+
import java.security.KeyStore;
38+
import java.util.concurrent.ExecutorService;
39+
import java.util.concurrent.ThreadFactory;
40+
import javax.net.ssl.TrustManagerFactory;
41+
42+
/**
43+
* A {@link WebsocketConnection.WSClient} implementation based on the Netty framework. Uses
44+
* a single-threaded NIO event loop to read and write bytes from a WebSocket connection. Netty
45+
* handles all the low-level IO, SSL and WebSocket handshake, and other protocol-specific details.
46+
*
47+
* <p>This implementation does not initiate connection close on its own. In case of errors or loss
48+
* of connectivity, it notifies the higher layer ({@link WebsocketConnection}), which then decides
49+
* whether to initiate a connection tear down.
50+
*/
51+
class NettyWebSocketClient implements WebsocketConnection.WSClient {
52+
53+
private static final int DEFAULT_WSS_PORT = 443;
54+
55+
private final URI uri;
56+
private final WebsocketConnection.WSClientEventHandler eventHandler;
57+
private final ChannelHandler channelHandler;
58+
private final ExecutorService executorService;
59+
private final EventLoopGroup group;
60+
61+
private Channel channel;
62+
63+
NettyWebSocketClient(
64+
URI uri, String userAgent, ThreadFactory threadFactory,
65+
WebsocketConnection.WSClientEventHandler eventHandler) {
66+
this.uri = checkNotNull(uri, "uri must not be null");
67+
this.eventHandler = checkNotNull(eventHandler, "event handler must not be null");
68+
this.channelHandler = new WebSocketClientHandler(uri, userAgent, eventHandler);
69+
this.executorService = new RevivingScheduledExecutor(
70+
threadFactory, "firebase-websocket-worker", GaeThreadFactory.isAvailable());
71+
this.group = new NioEventLoopGroup(1, this.executorService);
72+
}
73+
74+
@Override
75+
public void connect() {
76+
checkState(channel == null, "channel already initialized");
77+
try {
78+
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(
79+
TrustManagerFactory.getDefaultAlgorithm());
80+
trustFactory.init((KeyStore) null);
81+
final SslContext sslContext = SslContextBuilder.forClient()
82+
.trustManager(trustFactory).build();
83+
Bootstrap bootstrap = new Bootstrap();
84+
final int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_WSS_PORT;
85+
bootstrap.group(group)
86+
.channel(NioSocketChannel.class)
87+
.handler(new ChannelInitializer<SocketChannel>() {
88+
@Override
89+
protected void initChannel(SocketChannel ch) {
90+
ChannelPipeline p = ch.pipeline();
91+
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), port));
92+
p.addLast(
93+
new HttpClientCodec(),
94+
// Set the max size for the HTTP responses. This only applies to the WebSocket
95+
// handshake response from the server.
96+
new HttpObjectAggregator(32 * 1024),
97+
channelHandler);
98+
}
99+
});
100+
101+
ChannelFuture channelFuture = bootstrap.connect(uri.getHost(), port);
102+
this.channel = channelFuture.channel();
103+
channelFuture.addListener(
104+
new ChannelFutureListener() {
105+
@Override
106+
public void operationComplete(ChannelFuture future) throws Exception {
107+
if (!future.isSuccess()) {
108+
eventHandler.onError(future.cause());
109+
}
110+
}
111+
}
112+
);
113+
} catch (Exception e) {
114+
eventHandler.onError(e);
115+
}
116+
}
117+
118+
@Override
119+
public void close() {
120+
checkState(channel != null, "channel not initialized");
121+
try {
122+
channel.close();
123+
} finally {
124+
// The following may leave an active threadDeathWatcher daemon behind. That can be cleaned
125+
// up at a higher level if necessary. See https://github.com/netty/netty/issues/7310.
126+
group.shutdownGracefully();
127+
executorService.shutdown();
128+
}
129+
}
130+
131+
@Override
132+
public void send(String msg) {
133+
checkState(channel != null && channel.isActive(), "channel not connected for sending");
134+
channel.writeAndFlush(new TextWebSocketFrame(msg));
135+
}
136+
137+
/**
138+
* Handles low-level IO events. These events fire on the firebase-websocket-worker thread. We
139+
* notify the {@link WebsocketConnection} on all events, which then hands them off to the
140+
* RunLoop for further processing.
141+
*/
142+
private static class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {
143+
144+
private final WebsocketConnection.WSClientEventHandler delegate;
145+
private final WebSocketClientHandshaker handshaker;
146+
147+
WebSocketClientHandler(
148+
URI uri, String userAgent, WebsocketConnection.WSClientEventHandler delegate) {
149+
this.delegate = checkNotNull(delegate, "delegate must not be null");
150+
checkArgument(!Strings.isNullOrEmpty(userAgent), "user agent must not be null or empty");
151+
this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(
152+
uri, WebSocketVersion.V13, null, true,
153+
new DefaultHttpHeaders().add("User-Agent", userAgent));
154+
}
155+
156+
@Override
157+
public void handlerAdded(ChannelHandlerContext context) {
158+
// Do nothing
159+
}
160+
161+
@Override
162+
public void channelActive(ChannelHandlerContext context) {
163+
handshaker.handshake(context.channel());
164+
}
165+
166+
@Override
167+
public void channelInactive(ChannelHandlerContext context) {
168+
delegate.onClose();
169+
}
170+
171+
@Override
172+
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
173+
Channel channel = context.channel();
174+
if (message instanceof FullHttpResponse) {
175+
checkState(!handshaker.isHandshakeComplete());
176+
try {
177+
handshaker.finishHandshake(channel, (FullHttpResponse) message);
178+
delegate.onOpen();
179+
} catch (WebSocketHandshakeException e) {
180+
delegate.onError(e);
181+
}
182+
} else if (message instanceof TextWebSocketFrame) {
183+
delegate.onMessage(((TextWebSocketFrame) message).text());
184+
} else {
185+
checkState(message instanceof CloseWebSocketFrame);
186+
delegate.onClose();
187+
}
188+
}
189+
190+
@Override
191+
public void exceptionCaught(ChannelHandlerContext context, final Throwable cause) {
192+
delegate.onError(cause);
193+
}
194+
}
195+
}

src/main/java/com/google/firebase/database/connection/PersistentConnectionImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ public void onDisconnect(Connection.DisconnectReason reason) {
283283
this.connectionState = ConnectionState.Disconnected;
284284
this.realtime = null;
285285
this.hasOnDisconnects = false;
286+
requestCBHash.clear();
286287
if (inactivityTimer != null) {
287288
logger.debug("cancelling idle time checker");
288289
inactivityTimer.cancel(false);

0 commit comments

Comments
 (0)