Skip to content

Fixing a Thread Leak in Database Client Code (Related to #29) #30

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 18, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public PersistentConnectionImpl(
.withMinDelayAfterFailure(1000)
.withRetryExponent(1.3)
.withMaxDelay(30 * 1000)
.withJitterFactor(0.7)
.withJitterFactor(0.25)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I mildly object to changing this since:

  1. I don't see how playing with timing numbers is going to fix it... It's like adding a sleep to fix a race condition... It may work (sometimes), but it's not a good long-term fix. :-)

  2. The goal of the jitter is to "smear" reconnecting clients over a larger period of time so they don't all hit the backend at the same times after a server restarts / has downtime / whatever. So in theory, this change would more than double the load on the server, since we're smearing over 25% of the time interval rather than 70%, and so we should talk to the backend team to make sure they're okay with this change.

But #1 isn't harmful and in practice #2 is likely inconsequential because the number of clients using the admin SDK will be extremely small compared to the number of clients using the mobile / web clients. So backend load isn't really an issue, and so if we want more predictable reconnect intervals, I guess it's fine. :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems both thread creation and termination are very expensive in App Engine. Both involve RPC calls and heavy backend processing. One implication of this is that threads don't really die when our Runnables return (it seems). There's some internal cleanup that needs to happen from GAE-end. Therefore having some breathing room between thread termination and creation helps. In my tests I observed a noticeable difference in error handling behavior when the reconnects are far apart in time.

However, those tests were conducted using an exponential backoff factor of 2. So perhaps dropping the jitter factor alone may not be enough. I'll revert this change for now. We can revisit it in the future, if we continue to have issues in GAE.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW-

  1. To test your other changes, it might be interesting to set the initial delay to 0 and the backoff factor to 1 temporarily to make sure reconnect timing is no longer a factor (obviously don't leave it deployed like this for an extended period of time :-)).

  2. I'd be more comfortable increasing the initial reconnect time than the backoff factor... and if we decide that GAE is weird enough that we want to minimize the thread creation for reconnect attempts, we could change these settings. But I'd at least accompany it with some comments explaining the rationale for the numbers (i.e. have a specific target for the frequency with which we will tolerate creating / destroying threads).

.build();

long connId = connectionIds++;
Expand Down
47 changes: 26 additions & 21 deletions src/main/java/com/google/firebase/database/tubesock/WebSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.firebase.database.tubesock;

import static com.google.common.base.Preconditions.checkState;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -64,7 +66,7 @@ public void setName(Thread t, String name) {
private final WebSocketWriter writer;
private final WebSocketHandshake handshake;
private final int clientId = clientCount.incrementAndGet();
private final Thread innerThread;
private Thread innerThread;
private volatile State state = State.NONE;
private volatile Socket socket = null;
private WebSocketEventHandler eventHandler = null;
Expand Down Expand Up @@ -99,15 +101,6 @@ public WebSocket(URI url, String protocol) {
* if not extra headers are requested
*/
public WebSocket(URI url, String protocol, Map<String, String> extraHeaders) {
innerThread =
getThreadFactory()
.newThread(
new Runnable() {
@Override
public void run() {
runReader();
}
});
this.url = url;
handshake = new WebSocketHandshake(url, protocol, extraHeaders);
receiver = new WebSocketReceiver(this);
Expand Down Expand Up @@ -150,9 +143,8 @@ public synchronized void connect() {
close();
return;
}
getIntializer().setName(getInnerThread(), THREAD_BASE_NAME + "Reader-" + clientId);
state = State.CONNECTING;
getInnerThread().start();
start();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Since the code style here seems to be mix public and private methods, putting helpers close to where they're used, I'd put the start() method directly below here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


/**
Expand Down Expand Up @@ -312,13 +304,15 @@ private Socket createSocket() {
* convenience method to make sure everything shuts down, if desired.
*/
public void blockClose() throws InterruptedException {
// If the thread is new, it will never run, since we closed the connection before we
// actually
// connected
if (writer.getInnerThread().getState() != Thread.State.NEW) {
writer.getInnerThread().join();
writer.waitFor();
Thread thread;
synchronized (this) {
if (innerThread == null) {
return;
}
thread = innerThread;
}
getInnerThread().join();
thread.join();
}

private void runReader() {
Expand Down Expand Up @@ -389,7 +383,7 @@ private void runReader() {
writer.setOutput(output);
receiver.setInput(input);
state = WebSocket.State.CONNECTED;
writer.getInnerThread().start();
writer.start();
eventHandler.onOpen();
receiver.run();
} catch (WebSocketException wse) {
Expand All @@ -402,8 +396,19 @@ private void runReader() {
}
}

Thread getInnerThread() {
return innerThread;
private synchronized void start() {
checkState(innerThread == null, "Inner thread already started");
innerThread =
getThreadFactory()
.newThread(
new Runnable() {
@Override
public void run() {
runReader();
}
});
getIntializer().setName(innerThread, THREAD_BASE_NAME + "Reader-" + clientId);
innerThread.start();
}

private enum State {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ void run() {
this.eventHandler = websocket.getEventHandler();
while (!stop) {
try {
if (Thread.interrupted()) {
throw new InterruptedException();
}
int offset = 0;
offset += read(inputHeader, offset, 1);
boolean fin = (inputHeader[0] & 0x80) != 0;
Expand Down Expand Up @@ -94,6 +97,8 @@ void run() {
continue;
} catch (IOException ioe) {
handleError(new WebSocketException("IO Error", ioe));
} catch (InterruptedException e) {
handleError(new WebSocketException("Receiver interrupted", e));
} catch (WebSocketException e) {
handleError(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.firebase.database.tubesock;

import static com.google.common.base.Preconditions.checkState;

import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
Expand All @@ -32,26 +34,20 @@
class WebSocketWriter {

private final Random random = new Random();
private final Thread innerThread;
private final String threadBaseName;
private final int clientId;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than add 2 members for these kinda' arbitrary values, can we just add a threadName member that we still compute in the constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private Thread innerThread;
private BlockingQueue<ByteBuffer> pendingBuffers;
private volatile boolean stop = false;
private boolean closeSent = false;
private WebSocket websocket;
private WritableByteChannel channel;

WebSocketWriter(WebSocket websocket, String threadBaseName, int clientId) {
innerThread =
WebSocket.getThreadFactory()
.newThread(
new Runnable() {
@Override
public void run() {
runWriter();
}
});

WebSocket.getIntializer().setName(getInnerThread(), threadBaseName + "Writer-" + clientId);
this.websocket = websocket;
this.threadBaseName = threadBaseName;
this.clientId = clientId;
pendingBuffers = new LinkedBlockingQueue<>();
}

Expand Down Expand Up @@ -166,7 +162,31 @@ private void runWriter() {
}
}

Thread getInnerThread() {
return innerThread;
synchronized void start() {
checkState(innerThread == null, "Inner thread already started");
innerThread =
WebSocket.getThreadFactory()
.newThread(
new Runnable() {
@Override
public void run() {
runWriter();
}
});
WebSocket.getIntializer().setName(innerThread, threadBaseName + "Writer-" + clientId);
innerThread.start();
}

void waitFor() throws InterruptedException {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitFor... what? waitForThreadExit() maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to waitForTermination

// If the thread is null, it will never be instantiated, since we closed the connection
// before we actually connected.
Thread thread;
synchronized (this) {
if (innerThread == null) {
return;
}
thread = innerThread;
}
thread.join();
}
}