-
Notifications
You must be signed in to change notification settings - Fork 289
Fixing a Thread Leak in Database Client Code (Related to #29) #30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
|
||
package com.google.firebase.database.tubesock; | ||
|
||
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import java.io.DataInputStream; | ||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
|
@@ -64,7 +66,7 @@ public void setName(Thread t, String name) { | |
private final WebSocketWriter writer; | ||
private final WebSocketHandshake handshake; | ||
private final int clientId = clientCount.incrementAndGet(); | ||
private final Thread innerThread; | ||
private Thread innerThread; | ||
private volatile State state = State.NONE; | ||
private volatile Socket socket = null; | ||
private WebSocketEventHandler eventHandler = null; | ||
|
@@ -99,15 +101,6 @@ public WebSocket(URI url, String protocol) { | |
* if not extra headers are requested | ||
*/ | ||
public WebSocket(URI url, String protocol, Map<String, String> extraHeaders) { | ||
innerThread = | ||
getThreadFactory() | ||
.newThread( | ||
new Runnable() { | ||
@Override | ||
public void run() { | ||
runReader(); | ||
} | ||
}); | ||
this.url = url; | ||
handshake = new WebSocketHandshake(url, protocol, extraHeaders); | ||
receiver = new WebSocketReceiver(this); | ||
|
@@ -150,9 +143,8 @@ public synchronized void connect() { | |
close(); | ||
return; | ||
} | ||
getIntializer().setName(getInnerThread(), THREAD_BASE_NAME + "Reader-" + clientId); | ||
state = State.CONNECTING; | ||
getInnerThread().start(); | ||
start(); | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: Since the code style here seems to be mix public and private methods, putting helpers close to where they're used, I'd put the start() method directly below here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
/** | ||
|
@@ -312,13 +304,15 @@ private Socket createSocket() { | |
* convenience method to make sure everything shuts down, if desired. | ||
*/ | ||
public void blockClose() throws InterruptedException { | ||
// If the thread is new, it will never run, since we closed the connection before we | ||
// actually | ||
// connected | ||
if (writer.getInnerThread().getState() != Thread.State.NEW) { | ||
writer.getInnerThread().join(); | ||
writer.waitFor(); | ||
Thread thread; | ||
synchronized (this) { | ||
if (innerThread == null) { | ||
return; | ||
} | ||
thread = innerThread; | ||
} | ||
getInnerThread().join(); | ||
thread.join(); | ||
} | ||
|
||
private void runReader() { | ||
|
@@ -389,7 +383,7 @@ private void runReader() { | |
writer.setOutput(output); | ||
receiver.setInput(input); | ||
state = WebSocket.State.CONNECTED; | ||
writer.getInnerThread().start(); | ||
writer.start(); | ||
eventHandler.onOpen(); | ||
receiver.run(); | ||
} catch (WebSocketException wse) { | ||
|
@@ -402,8 +396,19 @@ private void runReader() { | |
} | ||
} | ||
|
||
Thread getInnerThread() { | ||
return innerThread; | ||
private synchronized void start() { | ||
checkState(innerThread == null, "Inner thread already started"); | ||
innerThread = | ||
getThreadFactory() | ||
.newThread( | ||
new Runnable() { | ||
@Override | ||
public void run() { | ||
runReader(); | ||
} | ||
}); | ||
getIntializer().setName(innerThread, THREAD_BASE_NAME + "Reader-" + clientId); | ||
innerThread.start(); | ||
} | ||
|
||
private enum State { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,8 @@ | |
|
||
package com.google.firebase.database.tubesock; | ||
|
||
import static com.google.common.base.Preconditions.checkState; | ||
|
||
import java.io.IOException; | ||
import java.io.OutputStream; | ||
import java.nio.ByteBuffer; | ||
|
@@ -32,26 +34,20 @@ | |
class WebSocketWriter { | ||
|
||
private final Random random = new Random(); | ||
private final Thread innerThread; | ||
private final String threadBaseName; | ||
private final int clientId; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than add 2 members for these kinda' arbitrary values, can we just add a threadName member that we still compute in the constructor? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
private Thread innerThread; | ||
private BlockingQueue<ByteBuffer> pendingBuffers; | ||
private volatile boolean stop = false; | ||
private boolean closeSent = false; | ||
private WebSocket websocket; | ||
private WritableByteChannel channel; | ||
|
||
WebSocketWriter(WebSocket websocket, String threadBaseName, int clientId) { | ||
innerThread = | ||
WebSocket.getThreadFactory() | ||
.newThread( | ||
new Runnable() { | ||
@Override | ||
public void run() { | ||
runWriter(); | ||
} | ||
}); | ||
|
||
WebSocket.getIntializer().setName(getInnerThread(), threadBaseName + "Writer-" + clientId); | ||
this.websocket = websocket; | ||
this.threadBaseName = threadBaseName; | ||
this.clientId = clientId; | ||
pendingBuffers = new LinkedBlockingQueue<>(); | ||
} | ||
|
||
|
@@ -166,7 +162,31 @@ private void runWriter() { | |
} | ||
} | ||
|
||
Thread getInnerThread() { | ||
return innerThread; | ||
synchronized void start() { | ||
checkState(innerThread == null, "Inner thread already started"); | ||
innerThread = | ||
WebSocket.getThreadFactory() | ||
.newThread( | ||
new Runnable() { | ||
@Override | ||
public void run() { | ||
runWriter(); | ||
} | ||
}); | ||
WebSocket.getIntializer().setName(innerThread, threadBaseName + "Writer-" + clientId); | ||
innerThread.start(); | ||
} | ||
|
||
void waitFor() throws InterruptedException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. waitFor... what? waitForThreadExit() maybe? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to |
||
// If the thread is null, it will never be instantiated, since we closed the connection | ||
// before we actually connected. | ||
Thread thread; | ||
synchronized (this) { | ||
if (innerThread == null) { | ||
return; | ||
} | ||
thread = innerThread; | ||
} | ||
thread.join(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW, I mildly object to changing this since:
I don't see how playing with timing numbers is going to fix it... It's like adding a sleep to fix a race condition... It may work (sometimes), but it's not a good long-term fix. :-)
The goal of the jitter is to "smear" reconnecting clients over a larger period of time so they don't all hit the backend at the same times after a server restarts / has downtime / whatever. So in theory, this change would more than double the load on the server, since we're smearing over 25% of the time interval rather than 70%, and so we should talk to the backend team to make sure they're okay with this change.
But #1 isn't harmful and in practice #2 is likely inconsequential because the number of clients using the admin SDK will be extremely small compared to the number of clients using the mobile / web clients. So backend load isn't really an issue, and so if we want more predictable reconnect intervals, I guess it's fine. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems both thread creation and termination are very expensive in App Engine. Both involve RPC calls and heavy backend processing. One implication of this is that threads don't really die when our Runnables return (it seems). There's some internal cleanup that needs to happen from GAE-end. Therefore having some breathing room between thread termination and creation helps. In my tests I observed a noticeable difference in error handling behavior when the reconnects are far apart in time.
However, those tests were conducted using an exponential backoff factor of 2. So perhaps dropping the jitter factor alone may not be enough. I'll revert this change for now. We can revisit it in the future, if we continue to have issues in GAE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FWIW-
To test your other changes, it might be interesting to set the initial delay to 0 and the backoff factor to 1 temporarily to make sure reconnect timing is no longer a factor (obviously don't leave it deployed like this for an extended period of time :-)).
I'd be more comfortable increasing the initial reconnect time than the backoff factor... and if we decide that GAE is weird enough that we want to minimize the thread creation for reconnect attempts, we could change these settings. But I'd at least accompany it with some comments explaining the rationale for the numbers (i.e. have a specific target for the frequency with which we will tolerate creating / destroying threads).