Skip to content

Commit 9a6a7a2

Browse files
committed
dev cli sends regular pings to keep connection alive
1 parent a974e8d commit 9a6a7a2

File tree

3 files changed

+103
-31
lines changed

3 files changed

+103
-31
lines changed

apps/webapp/app/v3/authenticatedSocketConnection.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ export class AuthenticatedSocketConnection {
9191
}
9292
}
9393
},
94+
PING: async () => {
95+
logger.debug("[AuthenticatedSocketConnection] Received ping", {
96+
id: this.id,
97+
envId: this.authenticatedEnv.id,
98+
});
99+
await this._sender.send("PONG", {});
100+
},
94101
},
95102
});
96103
}

packages/cli-v3/src/commands/dev.tsx

Lines changed: 90 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,65 @@ function useDev({
293293
`${dashboardUrl}/projects/v3/${config.project}`
294294
);
295295

296-
websocket.addEventListener("open", async (event) => {});
297-
websocket.addEventListener("close", (event) => {});
298-
websocket.addEventListener("error", (event) => {});
296+
const messageHandler = new ZodMessageHandler({
297+
schema: serverWebsocketMessages,
298+
messages: {
299+
SERVER_READY: async (payload) => {
300+
for (const worker of backgroundWorkerCoordinator.currentWorkers) {
301+
await sender.send("READY_FOR_TASKS", {
302+
backgroundWorkerId: worker.id,
303+
inProgressRuns: worker.worker.inProgressRuns,
304+
});
305+
}
306+
},
307+
BACKGROUND_WORKER_MESSAGE: async (payload) => {
308+
await backgroundWorkerCoordinator.handleMessage(payload.backgroundWorkerId, payload.data);
309+
},
310+
PONG: async () => {
311+
logger.debug("Received pong", { timestamp: Date.now() });
312+
},
313+
},
314+
});
315+
316+
websocket.addEventListener("message", async (event) => {
317+
const data = JSON.parse(
318+
typeof event.data === "string" ? event.data : new TextDecoder("utf-8").decode(event.data)
319+
);
320+
321+
await messageHandler.handleMessage(data);
322+
});
323+
324+
const ping = new WebsocketPing({
325+
callback: async () => {
326+
if (websocket.readyState !== WebSocket.OPEN) {
327+
logger.debug("Websocket not open, skipping ping");
328+
return;
329+
}
330+
331+
logger.debug("Sending ping", { timestamp: Date.now() });
332+
333+
await sender.send("PING", {});
334+
},
335+
});
336+
337+
websocket.addEventListener("open", async (event) => {
338+
logger.debug("Websocket opened", { event });
339+
340+
ping.start();
341+
});
342+
343+
websocket.addEventListener("close", (event) => {
344+
logger.debug("Websocket closed", { event });
345+
346+
ping.stop();
347+
});
348+
349+
websocket.addEventListener("error", (event) => {
350+
logger.log(`${chalkError("Websocket Error:")} ${event.error.message}`);
351+
logger.debug("Websocket error", { event, rawError: event.error });
352+
353+
ping.stop();
354+
});
299355

300356
// This is the deprecated task heart beat that uses the friendly attempt ID
301357
// It will only be used if the worker does not support lazy attempts
@@ -361,34 +417,6 @@ function useDev({
361417
});
362418
});
363419

364-
websocket.addEventListener("message", async (event) => {
365-
const data = JSON.parse(
366-
typeof event.data === "string" ? event.data : new TextDecoder("utf-8").decode(event.data)
367-
);
368-
369-
const messageHandler = new ZodMessageHandler({
370-
schema: serverWebsocketMessages,
371-
messages: {
372-
SERVER_READY: async (payload) => {
373-
for (const worker of backgroundWorkerCoordinator.currentWorkers) {
374-
await sender.send("READY_FOR_TASKS", {
375-
backgroundWorkerId: worker.id,
376-
inProgressRuns: worker.worker.inProgressRuns,
377-
});
378-
}
379-
},
380-
BACKGROUND_WORKER_MESSAGE: async (payload) => {
381-
await backgroundWorkerCoordinator.handleMessage(
382-
payload.backgroundWorkerId,
383-
payload.data
384-
);
385-
},
386-
},
387-
});
388-
389-
await messageHandler.handleMessage(data);
390-
});
391-
392420
let ctx: BuildContext | undefined;
393421

394422
let firstBuild = true;
@@ -972,3 +1000,34 @@ function createResolveEnvironmentVariablesFunction(configModule?: any) {
9721000
return resolvedEnvVars;
9731001
};
9741002
}
1003+
1004+
type WebsocketPingOptions = {
1005+
callback: () => Promise<void>;
1006+
pingIntervalInMs?: number;
1007+
};
1008+
1009+
class WebsocketPing {
1010+
private _callback: () => Promise<void>;
1011+
private _pingIntervalInMs: number;
1012+
private _nextPingIteration: NodeJS.Timeout | undefined;
1013+
1014+
constructor(opts: WebsocketPingOptions) {
1015+
this._callback = opts.callback;
1016+
this._pingIntervalInMs = opts.pingIntervalInMs ?? 45_000;
1017+
this._nextPingIteration = undefined;
1018+
}
1019+
1020+
start() {
1021+
this.#sendPing();
1022+
}
1023+
1024+
stop() {
1025+
clearTimeout(this._nextPingIteration);
1026+
}
1027+
1028+
#sendPing = async () => {
1029+
await this._callback();
1030+
1031+
this._nextPingIteration = setTimeout(this.#sendPing, this._pingIntervalInMs);
1032+
};
1033+
}

packages/core/src/v3/schemas/messages.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ export const serverWebsocketMessages = {
5757
backgroundWorkerId: z.string(),
5858
data: BackgroundWorkerServerMessages,
5959
}),
60+
PONG: z.object({
61+
version: z.literal("v1").default("v1"),
62+
}),
6063
};
6164

6265
export const BackgroundWorkerClientMessages = z.discriminatedUnion("type", [
@@ -108,6 +111,9 @@ export const clientWebsocketMessages = {
108111
backgroundWorkerId: z.string(),
109112
data: BackgroundWorkerClientMessages,
110113
}),
114+
PING: z.object({
115+
version: z.literal("v1").default("v1"),
116+
}),
111117
};
112118

113119
export const workerToChildMessages = {

0 commit comments

Comments
 (0)