-
Notifications
You must be signed in to change notification settings - Fork 289
WebSocket Implementation Based on Netty #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
705abcd
Prototyping Firestore integration for Java
hiranya911 084453e
Some code cleanup
hiranya911 39dec4c
Updating comments
hiranya911 cd8030c
Code cleanup
hiranya911 9be05c9
Merge branch 'master' into hkj-firestore-java
hiranya911 dfc6bec
Removed getInstance() methods from FirestoreClient and added getFires…
hiranya911 ef3881f
Merge branch 'hkj-firestore-java' of github.com:FirebasePrivate/fireb…
hiranya911 0cf020b
Merged with master
hiranya911 ab3469a
Minor cleanup
hiranya911 d64b4f6
Adding Firestore OAuth scopes
hiranya911 d87f364
Merge branch 'master' into hkj-firestore-java
hiranya911 8ed4e5f
Some documentation updates
hiranya911 48104da
Reverted a doc change
hiranya911 1771a01
Updating GCS dependency version (#89)
hiranya911 4201ff1
Netty-based websocket impl
hiranya911 7e7f694
Refactored the Netty stuff to a separate util class
hiranya911 7a74047
More code clean up and removing tubesock package
hiranya911 8e90610
Using the SDK ThreadManager for DB websocket threads
hiranya911 b910380
More code cleanup and error checks
hiranya911 a227cae
Handling connection drop events
hiranya911 6d8d90f
Handling connection loss events
hiranya911 e276f9a
Adding documentation
hiranya911 c10753d
Removed string list reader
hiranya911 da73c79
Merged with latest master
hiranya911 193fcde
Merge branch 'master' into hkj-firestore-java
hiranya911 a6613ce
Merge branch 'hkj-firestore-java' into hkj-netty-rtdb
hiranya911 59d1f5a
Removing redundant whitespace
hiranya911 e03a896
Revert "Using the SDK ThreadManager for DB websocket threads"
hiranya911 b87f536
src/main/java/com/google/firebase/database/connection/util/StringList…
hiranya911 4d7c74a
Added StringListReader back
hiranya911 536eb61
Re-updated websocket client
hiranya911 83c4e8b
reverting more syntactic changes
hiranya911 06f5515
Adding new line
hiranya911 bc74249
Added documentation; Using a more secure trust manager; Other cleanup
hiranya911 8829759
Removed unused import
hiranya911 a3411d6
Using the port on URI when available
hiranya911 3adf4a7
Merged with master
hiranya911 884bd18
Removing websocket commpression handler
hiranya911 d4a1438
Declaring Netty dependencies explicitly (using the same version as gRPC)
hiranya911 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
195 changes: 195 additions & 0 deletions
195
src/main/java/com/google/firebase/database/connection/NettyWebSocketClient.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package com.google.firebase.database.connection; | ||
|
||
import static com.google.common.base.Preconditions.checkArgument; | ||
import static com.google.common.base.Preconditions.checkNotNull; | ||
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import com.google.common.base.Strings; | ||
import com.google.firebase.internal.GaeThreadFactory; | ||
import com.google.firebase.internal.RevivingScheduledExecutor; | ||
import io.netty.bootstrap.Bootstrap; | ||
import io.netty.channel.Channel; | ||
import io.netty.channel.ChannelFuture; | ||
import io.netty.channel.ChannelFutureListener; | ||
import io.netty.channel.ChannelHandler; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.ChannelInitializer; | ||
import io.netty.channel.ChannelPipeline; | ||
import io.netty.channel.EventLoopGroup; | ||
import io.netty.channel.SimpleChannelInboundHandler; | ||
import io.netty.channel.nio.NioEventLoopGroup; | ||
import io.netty.channel.socket.SocketChannel; | ||
import io.netty.channel.socket.nio.NioSocketChannel; | ||
import io.netty.handler.codec.http.DefaultHttpHeaders; | ||
import io.netty.handler.codec.http.FullHttpResponse; | ||
import io.netty.handler.codec.http.HttpClientCodec; | ||
import io.netty.handler.codec.http.HttpObjectAggregator; | ||
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; | ||
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; | ||
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; | ||
import io.netty.handler.codec.http.websocketx.WebSocketHandshakeException; | ||
import io.netty.handler.codec.http.websocketx.WebSocketVersion; | ||
import io.netty.handler.ssl.SslContext; | ||
import io.netty.handler.ssl.SslContextBuilder; | ||
|
||
import java.net.URI; | ||
import java.security.KeyStore; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.ThreadFactory; | ||
import javax.net.ssl.TrustManagerFactory; | ||
|
||
/** | ||
* A {@link WebsocketConnection.WSClient} implementation based on the Netty framework. Uses | ||
* a single-threaded NIO event loop to read and write bytes from a WebSocket connection. Netty | ||
* handles all the low-level IO, SSL and WebSocket handshake, and other protocol-specific details. | ||
* | ||
* <p>This implementation does not initiate connection close on its own. In case of errors or loss | ||
* of connectivity, it notifies the higher layer ({@link WebsocketConnection}), which then decides | ||
* whether to initiate a connection tear down. | ||
*/ | ||
class NettyWebSocketClient implements WebsocketConnection.WSClient { | ||
|
||
private static final int DEFAULT_WSS_PORT = 443; | ||
|
||
private final URI uri; | ||
private final WebsocketConnection.WSClientEventHandler eventHandler; | ||
private final ChannelHandler channelHandler; | ||
private final ExecutorService executorService; | ||
private final EventLoopGroup group; | ||
|
||
private Channel channel; | ||
|
||
NettyWebSocketClient( | ||
URI uri, String userAgent, ThreadFactory threadFactory, | ||
WebsocketConnection.WSClientEventHandler eventHandler) { | ||
this.uri = checkNotNull(uri, "uri must not be null"); | ||
this.eventHandler = checkNotNull(eventHandler, "event handler must not be null"); | ||
this.channelHandler = new WebSocketClientHandler(uri, userAgent, eventHandler); | ||
this.executorService = new RevivingScheduledExecutor( | ||
threadFactory, "firebase-websocket-worker", GaeThreadFactory.isAvailable()); | ||
this.group = new NioEventLoopGroup(1, this.executorService); | ||
} | ||
|
||
@Override | ||
public void connect() { | ||
checkState(channel == null, "channel already initialized"); | ||
try { | ||
TrustManagerFactory trustFactory = TrustManagerFactory.getInstance( | ||
TrustManagerFactory.getDefaultAlgorithm()); | ||
trustFactory.init((KeyStore) null); | ||
final SslContext sslContext = SslContextBuilder.forClient() | ||
.trustManager(trustFactory).build(); | ||
Bootstrap bootstrap = new Bootstrap(); | ||
final int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_WSS_PORT; | ||
bootstrap.group(group) | ||
.channel(NioSocketChannel.class) | ||
.handler(new ChannelInitializer<SocketChannel>() { | ||
@Override | ||
protected void initChannel(SocketChannel ch) { | ||
ChannelPipeline p = ch.pipeline(); | ||
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), port)); | ||
p.addLast( | ||
new HttpClientCodec(), | ||
// Set the max size for the HTTP responses. This only applies to the WebSocket | ||
// handshake response from the server. | ||
new HttpObjectAggregator(32 * 1024), | ||
channelHandler); | ||
} | ||
}); | ||
|
||
ChannelFuture channelFuture = bootstrap.connect(uri.getHost(), port); | ||
this.channel = channelFuture.channel(); | ||
channelFuture.addListener( | ||
new ChannelFutureListener() { | ||
@Override | ||
public void operationComplete(ChannelFuture future) throws Exception { | ||
if (!future.isSuccess()) { | ||
eventHandler.onError(future.cause()); | ||
} | ||
} | ||
} | ||
); | ||
} catch (Exception e) { | ||
eventHandler.onError(e); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
checkState(channel != null, "channel not initialized"); | ||
try { | ||
channel.close(); | ||
} finally { | ||
// The following may leave an active threadDeathWatcher daemon behind. That can be cleaned | ||
// up at a higher level if necessary. See https://github.com/netty/netty/issues/7310. | ||
group.shutdownGracefully(); | ||
executorService.shutdown(); | ||
} | ||
} | ||
|
||
@Override | ||
public void send(String msg) { | ||
checkState(channel != null && channel.isActive(), "channel not connected for sending"); | ||
channel.writeAndFlush(new TextWebSocketFrame(msg)); | ||
} | ||
|
||
/** | ||
* Handles low-level IO events. These events fire on the firebase-websocket-worker thread. We | ||
* notify the {@link WebsocketConnection} on all events, which then hands them off to the | ||
* RunLoop for further processing. | ||
*/ | ||
private static class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { | ||
|
||
private final WebsocketConnection.WSClientEventHandler delegate; | ||
private final WebSocketClientHandshaker handshaker; | ||
|
||
WebSocketClientHandler( | ||
URI uri, String userAgent, WebsocketConnection.WSClientEventHandler delegate) { | ||
this.delegate = checkNotNull(delegate, "delegate must not be null"); | ||
checkArgument(!Strings.isNullOrEmpty(userAgent), "user agent must not be null or empty"); | ||
this.handshaker = WebSocketClientHandshakerFactory.newHandshaker( | ||
uri, WebSocketVersion.V13, null, true, | ||
new DefaultHttpHeaders().add("User-Agent", userAgent)); | ||
} | ||
|
||
@Override | ||
public void handlerAdded(ChannelHandlerContext context) { | ||
// Do nothing | ||
} | ||
|
||
@Override | ||
public void channelActive(ChannelHandlerContext context) { | ||
handshaker.handshake(context.channel()); | ||
} | ||
|
||
@Override | ||
public void channelInactive(ChannelHandlerContext context) { | ||
delegate.onClose(); | ||
} | ||
|
||
@Override | ||
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception { | ||
Channel channel = context.channel(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about just doing:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Thanks, looks much better now. |
||
if (message instanceof FullHttpResponse) { | ||
checkState(!handshaker.isHandshakeComplete()); | ||
try { | ||
handshaker.finishHandshake(channel, (FullHttpResponse) message); | ||
delegate.onOpen(); | ||
} catch (WebSocketHandshakeException e) { | ||
delegate.onError(e); | ||
} | ||
} else if (message instanceof TextWebSocketFrame) { | ||
delegate.onMessage(((TextWebSocketFrame) message).text()); | ||
} else { | ||
checkState(message instanceof CloseWebSocketFrame); | ||
delegate.onClose(); | ||
} | ||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext context, final Throwable cause) { | ||
delegate.onError(cause); | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the port from the URI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out if the URI string is specified as
https://test.firebaseio.com
(which is what most users do), URI.getPort() returns -1.