Skip to content

WebSocket Implementation Based on Netty #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Nov 22, 2017
Merged

WebSocket Implementation Based on Netty #91

merged 39 commits into from
Nov 22, 2017

Conversation

hiranya911
Copy link
Contributor

@hiranya911 hiranya911 commented Oct 23, 2017

Java Admin SDK's RTDB implementation has been using a homegrown WebSocket implementation (the entire com.google.firebase.databas.tubesock package). I'm sure there were good reasons for doing it that way, but that code was pretty complicated and barely maintainable.

Starting from #83 (Firestore integration), we have a dependency on Netty, which is a popular library with excellent WebSocket support. This PR gets rid of our homegrown WebSocket code, and implements it using Netty's NioEventLoop. This results in a net removal of around 1600 lines of code. I believe the final outcome is much simpler and far more maintainable.

Unlike our old implementation which uses 2 threads (reader and writer) for WebSocket communication, Netty uses a single-threaded event loop. I have also tested this locally, and on GAE (java8) for several days straight without any problems.

@hiranya911
Copy link
Contributor Author

@mikelehen I simplified this PR as advised by removing some of the refactoring commits. It should be easier to review now. The PR states 23 files have been changed, but actually 15 of those files are total removals. Changes are localized to just 8 files.

The class that has changed most is WebsocketConnection. The others have fairly small changes.

@@ -290,4 +284,4 @@ private String buildUserAgent(String platformAgent) {
.append(platformAgent);
return sb.toString();
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tzzzzzz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done :)

@@ -54,6 +55,8 @@ public static void destroy(Context ctx) {
instance.destroyInternal(ctx);
} finally {
ctx.stop();
// TODO(hkj): https://github.com/netty/netty/issues/7310
FastThreadLocal.removeAll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you verify that this still works with multiple database clients, one shutting down and the other one in continued use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested with a multi-app set up. Seems to work fine. After the first app has been deleted, the 2nd app continues to receive updates. I'll continue to test this scenario a bit deeply.

@@ -54,6 +55,8 @@ public static void destroy(Context ctx) {
instance.destroyInternal(ctx);
} finally {
ctx.stop();
// TODO(hkj): https://github.com/netty/netty/issues/7310
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this issue was closed as intended, it might make sense to just summarize the problem here in a comment and remove the reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -88,6 +90,7 @@ public RevivingScheduledExecutor(
final long initialDelayMs,
final long timeoutMs) {
super(0);
checkNotNull(threadFactory, "thread factory must not be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% Nit: "threadFactory must not be null" or "Thread factory must not be null" (applies to other places as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

*/
class NettyWebSocketClient implements WebsocketConnection.WSClient {

private static final int DEFAULT_WSS_PORT = 443;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the port from the URI?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out if the URI string is specified as https://test.firebaseio.com (which is what most users do), URI.getPort() returns -1.


@Override
public void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
Channel channel = context.channel();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just doing:

         try {
            if (message instanceof FullHttpResponse) {
              checkState(!handshaker.isHandshakeComplete());
              handshaker.finishHandshake(channel, (FullHttpResponse) message);
              delegate.onOpen();
            } else if (message instanceof TextWebSocketFrame) {
              delegate.onMessage(((TextWebSocketFrame) frame).text());
            } else {
              checkState(message instanceof CloseWebSocketFrame);
              delegate.onClose();
            }
          } catch (Exception e) {
            delegate.onError(e);
          }

Copy link
Contributor Author

@hiranya911 hiranya911 Oct 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks, looks much better now.

logger.debug("Tubesock: " + msg);
}
}
private static class StringList {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought for the longest time (up until this very line) that you were using an existing class. How much do we gain from having our own custom implementation, versus just doing it inline in the calling class?

And if you do decide to keep it, a small comment would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were using a custom StringListReader class for this purpose, which was far more complex than it needed to be. I just created a simpler version of it. Added a class comment to explain its purpose. I do believe the util class makes the core a bit more readable.

}

@Override
public void connect() {
String combine() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deservers a better name or at least a comment :) Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment.

Copy link

@mikelehen mikelehen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few issues and misc nits, but overall I think it looks reasonable. I'm still nervous about this change, so I'd recommend any extra testing you can do (in particular trying to hit error cases / reconnects, etc. that won't be exercised by our automated tests)

checkState(channel == null, "channel already initialized");
try {
final SslContext sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE).build();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intentional? I assume this means we're not validating certs which seems undesirable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used a proper trust manager. Thanks for pointing it out.

@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), DEFAULT_WSS_PORT));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why aren't we using the port from the uri instance? (here and below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

URI.getPort() returns -1 when the source URI string doesn't explicitly state the port.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with using DEFAULT_WSS_PORT in that case, but if you explicitly specify a port, this code should respect it. It may not matter much in practice, since prod firebase uses 443, but if somebody on the RTDB team wanted to test against a localhost instance or something, they might need to use a different port.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. Changed to:

int port = uri.getPort() != -1 ? uri.getPort() : DEFAULT_WSS_PORT;

p.addLast(sslContext.newHandler(ch.alloc(), uri.getHost(), DEFAULT_WSS_PORT));
p.addLast(
new HttpClientCodec(),
new HttpObjectAggregator(8192),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this 8192 come from? I can't remember what the maximum frame size we use is, but I thought it was at least 16k, and we should leave some buffer. (and please add a comment regarding the reasoning for the value chosen)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this only applies to the original handshake request. Once the HTTP connection has been upgraded to a 2-way WebSocket connection, this no longer has any effect. Having said that I've bumped this to 32K and also added a comment.

} finally {
group.shutdownGracefully();
executorService.shutdown();
// TODO(hkj): https://github.com/netty/netty/issues/7310

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is closed. Can you remove or improve this TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public void channelRead0(ChannelHandlerContext context, Object message) throws Exception {
Channel channel = context.channel();
if (!handshaker.isHandshakeComplete()) {
checkState(message instanceof FullHttpResponse);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is running on the firebase-websocket-worker thread, right? What happens when we throw an exception? Does it get handled, logged, etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tested by injecting some exceptions. It fires the NettyWebSocketClient.exceptionCaught() event, which we hand off to delegate.onError(). The net result is, the existing connection gets closed gracefully, and the client moves on to the next connection attempt.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this check fails, that means there's a bug in this code and we should at minimum log it (and crashing the process wouldn't be out-of-the-question). So if this just silently retries without logging anything, then I don't think that's sufficient.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delegate.onError() logs the exception (delegate is WebsocketConnection.WSClientHandlerImpl in this case):

      if (e.getCause() != null && e.getCause() instanceof EOFException) {
        logger.error("WebSocket reached EOF", e);
      } else {
        logger.error("WebSocket error", e);
      }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

logger.debug("WebSocket reached EOF.");
} else {
logger.debug("WebSocket error.", e);
}
onClosed();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think NettyWebsocketConnection always closes the connection before calling onError, so I wonder if this should be closeAndNotify() instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is what this method (onClosed()) does:

private void onClosed() {
    if (!isClosed) {
      if (logger.logsDebug()) {
        logger.debug("closing itself");
      }
      closeAndNotify();
    }
  }

logger.debug("Tubesock: " + msg);
}
}
private static class StringList {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment this a bit? It's a bit wonky since it's expecting a specific number of elements, unlike a normal list... You may want to make it a FrameBuffer or something to make it more obvious that it's special. But I don't feel strongly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments.

private final AtomicInteger remaining = new AtomicInteger(0);
private List<String> buffer;

void initialize(int capacity) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this the constructor and just create a new instance for each websocket message? I don't see a strong reason for this to be a reusable class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@Override
public void send(String msg) {
ws.send(msg);
private final AtomicInteger remaining = new AtomicInteger(0);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why AtomicInteger? This class should only be accessed by a single thread, right? Using AtomicInteger suggests otherwise which makes the code less clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -54,6 +55,8 @@ public static void destroy(Context ctx) {
instance.destroyInternal(ctx);
} finally {
ctx.stop();
// TODO(hkj): https://github.com/netty/netty/issues/7310

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this actionable or remove it. (but if you remove the TODO but keep the removeAll() call, please add a comment explaining what it's for)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@hiranya911
Copy link
Contributor Author

Made the suggested changes. Over to you for the next round.

@hiranya911 hiranya911 assigned mikelehen and unassigned hiranya911 Oct 26, 2017
Copy link

@mikelehen mikelehen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a couple more comments, but overall looks good. Thanks!

class WebsocketConnection {

private static final long KEEP_ALIVE_TIMEOUT_MS = 45 * 1000; // 45 seconds
private static final long CONNECT_TIMEOUT_MS = 30 * 1000; // 30 seconds
private static final int MAX_FRAME_SIZE = 16384;
private static long connectionId = 0;
private static final AtomicLong CONN_ID = new AtomicLong(0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikelehen's comment regarding the suggested thread safety applies here as well.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I almost called this out too, but actually since this is static it can technically be accessed by multiple threads (each Repo has its own run loop with its own thread and so these global connection ids could be assigned simultaneously on different threads). So I think AtomicLong is applicable here (although a race is unlikely in practice and wouldn't be particularly harmful, since the ID is just used for logging).

Copy link

@rockwotj rockwotj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just poking around here, feel free to ignore.

// Set the max size for the HTTP responses. This only applies to the WebSocket
// handshake response from the server.
new HttpObjectAggregator(32 * 1024),
WebSocketClientCompressionHandler.INSTANCE,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't compress websocket frames and don't plan on it, so this probably isn't needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks @rockwotj

@hiranya911 hiranya911 changed the base branch from hkj-firestore-java to master November 22, 2017 00:45
@hiranya911 hiranya911 merged commit 7187f05 into master Nov 22, 2017
@hiranya911 hiranya911 deleted the hkj-netty-rtdb branch November 22, 2017 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants