-
Notifications
You must be signed in to change notification settings - Fork 289
WebSocket Implementation Based on Netty #91
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ase-admin-java into hkj-firestore-java
@mikelehen I simplified this PR as advised by removing some of the refactoring commits. It should be easier to review now. The PR states 23 files have been changed, but actually 15 of those files are total removals. Changes are localized to just 8 files. The class that has changed most is |
@@ -290,4 +284,4 @@ private String buildUserAgent(String platformAgent) { | |||
.append(platformAgent); | |||
return sb.toString(); | |||
} | |||
} | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tzzzzzz.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done :)
@@ -54,6 +55,8 @@ public static void destroy(Context ctx) { | |||
instance.destroyInternal(ctx); | |||
} finally { | |||
ctx.stop(); | |||
// TODO(hkj): https://github.com/netty/netty/issues/7310 | |||
FastThreadLocal.removeAll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you verify that this still works with multiple database clients, one shutting down and the other one in continued use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested with a multi-app set up. Seems to work fine. After the first app has been deleted, the 2nd app continues to receive updates. I'll continue to test this scenario a bit deeply.
@@ -54,6 +55,8 @@ public static void destroy(Context ctx) { | |||
instance.destroyInternal(ctx); | |||
} finally { | |||
ctx.stop(); | |||
// TODO(hkj): https://github.com/netty/netty/issues/7310 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this issue was closed as intended, it might make sense to just summarize the problem here in a comment and remove the reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -88,6 +90,7 @@ public RevivingScheduledExecutor( | |||
final long initialDelayMs, | |||
final long timeoutMs) { | |||
super(0); | |||
checkNotNull(threadFactory, "thread factory must not be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% Nit: "threadFactory must not be null" or "Thread factory must not be null" (applies to other places as well)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
*/ | ||
class NettyWebSocketClient implements WebsocketConnection.WSClient { | ||
|
||
private static final int DEFAULT_WSS_PORT = 443; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the port from the URI?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out if the URI string is specified as https://test.firebaseio.com
(which is what most users do), URI.getPort() returns -1.
|
||
@Override | ||
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception { | ||
Channel channel = context.channel(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just doing:
try {
if (message instanceof FullHttpResponse) {
checkState(!handshaker.isHandshakeComplete());
handshaker.finishHandshake(channel, (FullHttpResponse) message);
delegate.onOpen();
} else if (message instanceof TextWebSocketFrame) {
delegate.onMessage(((TextWebSocketFrame) frame).text());
} else {
checkState(message instanceof CloseWebSocketFrame);
delegate.onClose();
}
} catch (Exception e) {
delegate.onError(e);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks, looks much better now.
logger.debug("Tubesock: " + msg); | ||
} | ||
} | ||
private static class StringList { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought for the longest time (up until this very line) that you were using an existing class. How much do we gain from having our own custom implementation, versus just doing it inline in the calling class?
And if you do decide to keep it, a small comment would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We were using a custom StringListReader
class for this purpose, which was far more complex than it needed to be. I just created a simpler version of it. Added a class comment to explain its purpose. I do believe the util class makes the core a bit more readable.
} | ||
|
||
@Override | ||
public void connect() { | ||
String combine() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This deservers a better name or at least a comment :) Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few issues and misc nits, but overall I think it looks reasonable. I'm still nervous about this change, so I'd recommend any extra testing you can do (in particular trying to hit error cases / reconnects, etc. that won't be exercised by our automated tests)
checkState(channel == null, "channel already initialized"); | ||
try { | ||
final SslContext sslContext = SslContextBuilder.forClient() | ||
.trustManager(InsecureTrustManagerFactory.INSTANCE).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional? I assume this means we're not validating certs which seems undesirable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used a proper trust manager. Thanks for pointing it out.
@Override | ||
protected void initChannel(SocketChannel ch) { | ||
ChannelPipeline p = ch.pipeline(); | ||
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), DEFAULT_WSS_PORT)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why aren't we using the port from the uri instance? (here and below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
URI.getPort()
returns -1 when the source URI string doesn't explicitly state the port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am fine with using DEFAULT_WSS_PORT in that case, but if you explicitly specify a port, this code should respect it. It may not matter much in practice, since prod firebase uses 443, but if somebody on the RTDB team wanted to test against a localhost instance or something, they might need to use a different port.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. Changed to:
int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_WSS_PORT;
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), DEFAULT_WSS_PORT)); | ||
p.addLast( | ||
new HttpClientCodec(), | ||
new HttpObjectAggregator(8192), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did this 8192 come from? I can't remember what the maximum frame size we use is, but I thought it was at least 16k, and we should leave some buffer. (and please add a comment regarding the reasoning for the value chosen)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this only applies to the original handshake request. Once the HTTP connection has been upgraded to a 2-way WebSocket connection, this no longer has any effect. Having said that I've bumped this to 32K and also added a comment.
} finally { | ||
group.shutdownGracefully(); | ||
executorService.shutdown(); | ||
// TODO(hkj): https://github.com/netty/netty/issues/7310 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue is closed. Can you remove or improve this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception { | ||
Channel channel = context.channel(); | ||
if (!handshaker.isHandshakeComplete()) { | ||
checkState(message instanceof FullHttpResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is running on the firebase-websocket-worker thread, right? What happens when we throw an exception? Does it get handled, logged, etc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tested by injecting some exceptions. It fires the NettyWebSocketClient.exceptionCaught()
event, which we hand off to delegate.onError()
. The net result is, the existing connection gets closed gracefully, and the client moves on to the next connection attempt.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this check fails, that means there's a bug in this code and we should at minimum log it (and crashing the process wouldn't be out-of-the-question). So if this just silently retries without logging anything, then I don't think that's sufficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delegate.onError()
logs the exception (delegate is WebsocketConnection.WSClientHandlerImpl
in this case):
if (e.getCause() != null && e.getCause() instanceof EOFException) {
logger.error("WebSocket reached EOF", e);
} else {
logger.error("WebSocket error", e);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
logger.debug("WebSocket reached EOF."); | ||
} else { | ||
logger.debug("WebSocket error.", e); | ||
} | ||
onClosed(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think NettyWebsocketConnection always closes the connection before calling onError, so I wonder if this should be closeAndNotify() instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is what this method (onClosed()
) does:
private void onClosed() {
if (!isClosed) {
if (logger.logsDebug()) {
logger.debug("closing itself");
}
closeAndNotify();
}
}
logger.debug("Tubesock: " + msg); | ||
} | ||
} | ||
private static class StringList { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment this a bit? It's a bit wonky since it's expecting a specific number of elements, unlike a normal list... You may want to make it a FrameBuffer or something to make it more obvious that it's special. But I don't feel strongly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added comments.
private final AtomicInteger remaining = new AtomicInteger(0); | ||
private List<String> buffer; | ||
|
||
void initialize(int capacity) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not make this the constructor and just create a new instance for each websocket message? I don't see a strong reason for this to be a reusable class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@Override | ||
public void send(String msg) { | ||
ws.send(msg); | ||
private final AtomicInteger remaining = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why AtomicInteger? This class should only be accessed by a single thread, right? Using AtomicInteger suggests otherwise which makes the code less clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
@@ -54,6 +55,8 @@ public static void destroy(Context ctx) { | |||
instance.destroyInternal(ctx); | |||
} finally { | |||
ctx.stop(); | |||
// TODO(hkj): https://github.com/netty/netty/issues/7310 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this actionable or remove it. (but if you remove the TODO but keep the removeAll() call, please add a comment explaining what it's for)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
Made the suggested changes. Over to you for the next round. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a couple more comments, but overall looks good. Thanks!
class WebsocketConnection { | ||
|
||
private static final long KEEP_ALIVE_TIMEOUT_MS = 45 * 1000; // 45 seconds | ||
private static final long CONNECT_TIMEOUT_MS = 30 * 1000; // 30 seconds | ||
private static final int MAX_FRAME_SIZE = 16384; | ||
private static long connectionId = 0; | ||
private static final AtomicLong CONN_ID = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mikelehen's comment regarding the suggested thread safety applies here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I almost called this out too, but actually since this is static it can technically be accessed by multiple threads (each Repo has its own run loop with its own thread and so these global connection ids could be assigned simultaneously on different threads). So I think AtomicLong is applicable here (although a race is unlikely in practice and wouldn't be particularly harmful, since the ID is just used for logging).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just poking around here, feel free to ignore.
// Set the max size for the HTTP responses. This only applies to the WebSocket | ||
// handshake response from the server. | ||
new HttpObjectAggregator(32 * 1024), | ||
WebSocketClientCompressionHandler.INSTANCE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't compress websocket frames and don't plan on it, so this probably isn't needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks @rockwotj
Java Admin SDK's RTDB implementation has been using a homegrown WebSocket implementation (the entire
com.google.firebase.databas.tubesock
package). I'm sure there were good reasons for doing it that way, but that code was pretty complicated and barely maintainable.Starting from #83 (Firestore integration), we have a dependency on Netty, which is a popular library with excellent WebSocket support. This PR gets rid of our homegrown WebSocket code, and implements it using Netty's
NioEventLoop
. This results in a net removal of around 1600 lines of code. I believe the final outcome is much simpler and far more maintainable.Unlike our old implementation which uses 2 threads (reader and writer) for WebSocket communication, Netty uses a single-threaded event loop. I have also tested this locally, and on GAE (java8) for several days straight without any problems.