Skip to content

WebSocket Implementation Based on Netty #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Nov 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
705abcd
Prototyping Firestore integration for Java
hiranya911 Aug 31, 2017
084453e
Some code cleanup
hiranya911 Aug 31, 2017
39dec4c
Updating comments
hiranya911 Sep 2, 2017
cd8030c
Code cleanup
hiranya911 Sep 2, 2017
9be05c9
Merge branch 'master' into hkj-firestore-java
hiranya911 Sep 7, 2017
dfc6bec
Removed getInstance() methods from FirestoreClient and added getFires…
hiranya911 Sep 7, 2017
ef3881f
Merge branch 'hkj-firestore-java' of github.com:FirebasePrivate/fireb…
hiranya911 Sep 7, 2017
0cf020b
Merged with master
hiranya911 Oct 6, 2017
ab3469a
Minor cleanup
hiranya911 Oct 6, 2017
d64b4f6
Adding Firestore OAuth scopes
hiranya911 Oct 6, 2017
d87f364
Merge branch 'master' into hkj-firestore-java
hiranya911 Oct 9, 2017
8ed4e5f
Some documentation updates
hiranya911 Oct 9, 2017
48104da
Reverted a doc change
hiranya911 Oct 9, 2017
1771a01
Updating GCS dependency version (#89)
hiranya911 Oct 11, 2017
4201ff1
Netty-based websocket impl
hiranya911 Oct 17, 2017
7e7f694
Refactored the Netty stuff to a separate util class
hiranya911 Oct 17, 2017
7a74047
More code clean up and removing tubesock package
hiranya911 Oct 17, 2017
8e90610
Using the SDK ThreadManager for DB websocket threads
hiranya911 Oct 17, 2017
b910380
More code cleanup and error checks
hiranya911 Oct 17, 2017
a227cae
Handling connection drop events
hiranya911 Oct 19, 2017
6d8d90f
Handling connection loss events
hiranya911 Oct 19, 2017
e276f9a
Adding documentation
hiranya911 Oct 19, 2017
c10753d
Removed string list reader
hiranya911 Oct 19, 2017
da73c79
Merged with latest master
hiranya911 Oct 20, 2017
193fcde
Merge branch 'master' into hkj-firestore-java
hiranya911 Oct 20, 2017
a6613ce
Merge branch 'hkj-firestore-java' into hkj-netty-rtdb
hiranya911 Oct 20, 2017
59d1f5a
Removing redundant whitespace
hiranya911 Oct 23, 2017
e03a896
Revert "Using the SDK ThreadManager for DB websocket threads"
hiranya911 Oct 24, 2017
b87f536
src/main/java/com/google/firebase/database/connection/util/StringList…
hiranya911 Oct 24, 2017
4d7c74a
Added StringListReader back
hiranya911 Oct 24, 2017
536eb61
Re-updated websocket client
hiranya911 Oct 24, 2017
83c4e8b
reverting more syntactic changes
hiranya911 Oct 24, 2017
06f5515
Adding new line
hiranya911 Oct 24, 2017
bc74249
Added documentation; Using a more secure trust manager; Other cleanup
hiranya911 Oct 26, 2017
8829759
Removed unused import
hiranya911 Oct 26, 2017
a3411d6
Using the port on URI when available
hiranya911 Oct 27, 2017
3adf4a7
Merged with master
hiranya911 Nov 10, 2017
884bd18
Removing websocket commpression handler
hiranya911 Nov 10, 2017
d4a1438
Declaring Netty dependencies explicitly (using the same version as gRPC)
hiranya911 Nov 22, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<skipUTs>${skipTests}</skipUTs>
<netty.version>4.1.14.Final</netty.version>
</properties>

<scm>
Expand Down Expand Up @@ -429,6 +430,21 @@
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${netty.version}</version>
</dependency>

<!-- Test Dependencies -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.google.firebase.database.connection;

import com.google.firebase.database.logging.Logger;
import com.google.firebase.database.tubesock.ThreadConfig;

import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;

public class ConnectionContext {

Expand All @@ -29,7 +29,7 @@ public class ConnectionContext {
private final boolean persistenceEnabled;
private final String clientSdkVersion;
private final String userAgent;
private final ThreadConfig threadConfig;
private final ThreadFactory threadFactory;

public ConnectionContext(
Logger logger,
Expand All @@ -38,14 +38,14 @@ public ConnectionContext(
boolean persistenceEnabled,
String clientSdkVersion,
String userAgent,
ThreadConfig threadConfig) {
ThreadFactory threadFactory) {
this.logger = logger;
this.authTokenProvider = authTokenProvider;
this.executorService = executorService;
this.persistenceEnabled = persistenceEnabled;
this.clientSdkVersion = clientSdkVersion;
this.userAgent = userAgent;
this.threadConfig = threadConfig;
this.threadFactory = threadFactory;
}

public Logger getLogger() {
Expand All @@ -72,7 +72,7 @@ public String getUserAgent() {
return this.userAgent;
}

public ThreadConfig getThreadConfig() {
return threadConfig;
public ThreadFactory getThreadFactory() {
return threadFactory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
package com.google.firebase.database.connection;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.base.Strings;
import com.google.firebase.internal.GaeThreadFactory;
import com.google.firebase.internal.RevivingScheduledExecutor;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;

import java.net.URI;
import java.security.KeyStore;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import javax.net.ssl.TrustManagerFactory;

/**
* A {@link WebsocketConnection.WSClient} implementation based on the Netty framework. Uses
* a single-threaded NIO event loop to read and write bytes from a WebSocket connection. Netty
* handles all the low-level IO, SSL and WebSocket handshake, and other protocol-specific details.
*
* <p>This implementation does not initiate connection close on its own. In case of errors or loss
* of connectivity, it notifies the higher layer ({@link WebsocketConnection}), which then decides
* whether to initiate a connection tear down.
*/
class NettyWebSocketClient implements WebsocketConnection.WSClient {

private static final int DEFAULT_WSS_PORT = 443;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the port from the URI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out if the URI string is specified as https://test.firebaseio.com (which is what most users do), URI.getPort() returns -1.


private final URI uri;
private final WebsocketConnection.WSClientEventHandler eventHandler;
private final ChannelHandler channelHandler;
private final ExecutorService executorService;
private final EventLoopGroup group;

private Channel channel;

NettyWebSocketClient(
URI uri, String userAgent, ThreadFactory threadFactory,
WebsocketConnection.WSClientEventHandler eventHandler) {
this.uri = checkNotNull(uri, "uri must not be null");
this.eventHandler = checkNotNull(eventHandler, "event handler must not be null");
this.channelHandler = new WebSocketClientHandler(uri, userAgent, eventHandler);
this.executorService = new RevivingScheduledExecutor(
threadFactory, "firebase-websocket-worker", GaeThreadFactory.isAvailable());
this.group = new NioEventLoopGroup(1, this.executorService);
}

@Override
public void connect() {
checkState(channel == null, "channel already initialized");
try {
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance(
TrustManagerFactory.getDefaultAlgorithm());
trustFactory.init((KeyStore) null);
final SslContext sslContext = SslContextBuilder.forClient()
.trustManager(trustFactory).build();
Bootstrap bootstrap = new Bootstrap();
final int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_WSS_PORT;
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), port));
p.addLast(
new HttpClientCodec(),
// Set the max size for the HTTP responses. This only applies to the WebSocket
// handshake response from the server.
new HttpObjectAggregator(32 * 1024),
channelHandler);
}
});

ChannelFuture channelFuture = bootstrap.connect(uri.getHost(), port);
this.channel = channelFuture.channel();
channelFuture.addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
eventHandler.onError(future.cause());
}
}
}
);
} catch (Exception e) {
eventHandler.onError(e);
}
}

@Override
public void close() {
checkState(channel != null, "channel not initialized");
try {
channel.close();
} finally {
// The following may leave an active threadDeathWatcher daemon behind. That can be cleaned
// up at a higher level if necessary. See https://github.com/netty/netty/issues/7310.
group.shutdownGracefully();
executorService.shutdown();
}
}

@Override
public void send(String msg) {
checkState(channel != null && channel.isActive(), "channel not connected for sending");
channel.writeAndFlush(new TextWebSocketFrame(msg));
}

/**
* Handles low-level IO events. These events fire on the firebase-websocket-worker thread. We
* notify the {@link WebsocketConnection} on all events, which then hands them off to the
* RunLoop for further processing.
*/
private static class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> {

private final WebsocketConnection.WSClientEventHandler delegate;
private final WebSocketClientHandshaker handshaker;

WebSocketClientHandler(
URI uri, String userAgent, WebsocketConnection.WSClientEventHandler delegate) {
this.delegate = checkNotNull(delegate, "delegate must not be null");
checkArgument(!Strings.isNullOrEmpty(userAgent), "user agent must not be null or empty");
this.handshaker = WebSocketClientHandshakerFactory.newHandshaker(
uri, WebSocketVersion.V13, null, true,
new DefaultHttpHeaders().add("User-Agent", userAgent));
}

@Override
public void handlerAdded(ChannelHandlerContext context) {
// Do nothing
}

@Override
public void channelActive(ChannelHandlerContext context) {
handshaker.handshake(context.channel());
}

@Override
public void channelInactive(ChannelHandlerContext context) {
delegate.onClose();
}

@Override
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
Channel channel = context.channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just doing:

         try {
            if (message instanceof FullHttpResponse) {
              checkState(!handshaker.isHandshakeComplete());
              handshaker.finishHandshake(channel, (FullHttpResponse) message);
              delegate.onOpen();
            } else if (message instanceof TextWebSocketFrame) {
              delegate.onMessage(((TextWebSocketFrame) frame).text());
            } else {
              checkState(message instanceof CloseWebSocketFrame);
              delegate.onClose();
            }
          } catch (Exception e) {
            delegate.onError(e);
          }

Copy link
Contributor Author

@hiranya911 hiranya911 Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks, looks much better now.

if (message instanceof FullHttpResponse) {
checkState(!handshaker.isHandshakeComplete());
try {
handshaker.finishHandshake(channel, (FullHttpResponse) message);
delegate.onOpen();
} catch (WebSocketHandshakeException e) {
delegate.onError(e);
}
} else if (message instanceof TextWebSocketFrame) {
delegate.onMessage(((TextWebSocketFrame) message).text());
} else {
checkState(message instanceof CloseWebSocketFrame);
delegate.onClose();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext context, final Throwable cause) {
delegate.onError(cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ public void onDisconnect(Connection.DisconnectReason reason) {
this.connectionState = ConnectionState.Disconnected;
this.realtime = null;
this.hasOnDisconnects = false;
requestCBHash.clear();
if (inactivityTimer != null) {
logger.debug("cancelling idle time checker");
inactivityTimer.cancel(false);
Expand Down
Loading