Skip to content

Commit 8acb13b

Browse files
authored
Fixing a Thread Leak in Database Client Code (Related to #29) (#30)
* Creating Websocket reader and writer threads lazily to avoid the thread leak described in #29; Handling thread interrupts from Websocket receiver thread (not comprehensive due to the blocking nature of IO calls; Reducing the default jitter factor to 25% * Rearranging the code for clarity; Reverting the change to jitter factor (needs more testing)
1 parent 1200aef commit 8acb13b

File tree

3 files changed

+65
-37
lines changed

3 files changed

+65
-37
lines changed

src/main/java/com/google/firebase/database/tubesock/WebSocket.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.firebase.database.tubesock;
1818

19+
import static com.google.common.base.Preconditions.checkState;
20+
1921
import java.io.DataInputStream;
2022
import java.io.IOException;
2123
import java.io.OutputStream;
@@ -64,7 +66,7 @@ public void setName(Thread t, String name) {
6466
private final WebSocketWriter writer;
6567
private final WebSocketHandshake handshake;
6668
private final int clientId = clientCount.incrementAndGet();
67-
private final Thread innerThread;
69+
private Thread innerThread;
6870
private volatile State state = State.NONE;
6971
private volatile Socket socket = null;
7072
private WebSocketEventHandler eventHandler = null;
@@ -99,15 +101,6 @@ public WebSocket(URI url, String protocol) {
99101
* if not extra headers are requested
100102
*/
101103
public WebSocket(URI url, String protocol, Map<String, String> extraHeaders) {
102-
innerThread =
103-
getThreadFactory()
104-
.newThread(
105-
new Runnable() {
106-
@Override
107-
public void run() {
108-
runReader();
109-
}
110-
});
111104
this.url = url;
112105
handshake = new WebSocketHandshake(url, protocol, extraHeaders);
113106
receiver = new WebSocketReceiver(this);
@@ -150,9 +143,23 @@ public synchronized void connect() {
150143
close();
151144
return;
152145
}
153-
getIntializer().setName(getInnerThread(), THREAD_BASE_NAME + "Reader-" + clientId);
154146
state = State.CONNECTING;
155-
getInnerThread().start();
147+
start();
148+
}
149+
150+
private synchronized void start() {
151+
checkState(innerThread == null, "Inner thread already started");
152+
innerThread =
153+
getThreadFactory()
154+
.newThread(
155+
new Runnable() {
156+
@Override
157+
public void run() {
158+
runReader();
159+
}
160+
});
161+
getIntializer().setName(innerThread, THREAD_BASE_NAME + "Reader-" + clientId);
162+
innerThread.start();
156163
}
157164

158165
/**
@@ -312,13 +319,15 @@ private Socket createSocket() {
312319
* convenience method to make sure everything shuts down, if desired.
313320
*/
314321
public void blockClose() throws InterruptedException {
315-
// If the thread is new, it will never run, since we closed the connection before we
316-
// actually
317-
// connected
318-
if (writer.getInnerThread().getState() != Thread.State.NEW) {
319-
writer.getInnerThread().join();
322+
writer.waitForTermination();
323+
Thread thread;
324+
synchronized (this) {
325+
if (innerThread == null) {
326+
return;
327+
}
328+
thread = innerThread;
320329
}
321-
getInnerThread().join();
330+
thread.join();
322331
}
323332

324333
private void runReader() {
@@ -389,7 +398,7 @@ private void runReader() {
389398
writer.setOutput(output);
390399
receiver.setInput(input);
391400
state = WebSocket.State.CONNECTED;
392-
writer.getInnerThread().start();
401+
writer.start();
393402
eventHandler.onOpen();
394403
receiver.run();
395404
} catch (WebSocketException wse) {
@@ -402,10 +411,6 @@ private void runReader() {
402411
}
403412
}
404413

405-
Thread getInnerThread() {
406-
return innerThread;
407-
}
408-
409414
private enum State {
410415
NONE,
411416
CONNECTING,

src/main/java/com/google/firebase/database/tubesock/WebSocketReceiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ void run() {
4747
this.eventHandler = websocket.getEventHandler();
4848
while (!stop) {
4949
try {
50+
if (Thread.interrupted()) {
51+
throw new InterruptedException();
52+
}
5053
int offset = 0;
5154
offset += read(inputHeader, offset, 1);
5255
boolean fin = (inputHeader[0] & 0x80) != 0;
@@ -94,6 +97,8 @@ void run() {
9497
continue;
9598
} catch (IOException ioe) {
9699
handleError(new WebSocketException("IO Error", ioe));
100+
} catch (InterruptedException e) {
101+
handleError(new WebSocketException("Receiver interrupted", e));
97102
} catch (WebSocketException e) {
98103
handleError(e);
99104
}

src/main/java/com/google/firebase/database/tubesock/WebSocketWriter.java

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.firebase.database.tubesock;
1818

19+
import static com.google.common.base.Preconditions.checkState;
20+
1921
import java.io.IOException;
2022
import java.io.OutputStream;
2123
import java.nio.ByteBuffer;
@@ -32,26 +34,18 @@
3234
class WebSocketWriter {
3335

3436
private final Random random = new Random();
35-
private final Thread innerThread;
37+
private final String threadName;
38+
39+
private Thread innerThread;
3640
private BlockingQueue<ByteBuffer> pendingBuffers;
3741
private volatile boolean stop = false;
3842
private boolean closeSent = false;
3943
private WebSocket websocket;
4044
private WritableByteChannel channel;
4145

4246
WebSocketWriter(WebSocket websocket, String threadBaseName, int clientId) {
43-
innerThread =
44-
WebSocket.getThreadFactory()
45-
.newThread(
46-
new Runnable() {
47-
@Override
48-
public void run() {
49-
runWriter();
50-
}
51-
});
52-
53-
WebSocket.getIntializer().setName(getInnerThread(), threadBaseName + "Writer-" + clientId);
5447
this.websocket = websocket;
48+
this.threadName = threadBaseName + "Writer-" + clientId;
5549
pendingBuffers = new LinkedBlockingQueue<>();
5650
}
5751

@@ -166,7 +160,31 @@ private void runWriter() {
166160
}
167161
}
168162

169-
Thread getInnerThread() {
170-
return innerThread;
163+
synchronized void start() {
164+
checkState(innerThread == null, "Inner thread already started");
165+
innerThread =
166+
WebSocket.getThreadFactory()
167+
.newThread(
168+
new Runnable() {
169+
@Override
170+
public void run() {
171+
runWriter();
172+
}
173+
});
174+
WebSocket.getIntializer().setName(innerThread, threadName);
175+
innerThread.start();
176+
}
177+
178+
void waitForTermination() throws InterruptedException {
179+
// If the thread is null, it will never be instantiated, since we closed the connection
180+
// before we actually connected.
181+
Thread thread;
182+
synchronized (this) {
183+
if (innerThread == null) {
184+
return;
185+
}
186+
thread = innerThread;
187+
}
188+
thread.join();
171189
}
172190
}

0 commit comments

Comments
 (0)