Skip to content

Commit dbda820

Browse files
authored
v3: fix websocket timeouts (#1220)
* prevent uncaught websocket exceptions * dev cli sends regular pings to keep connection alive * add changeset * fix for prod message forwarding * add server-side websocket ping * Revert "dev cli sends regular pings to keep connection alive" This reverts commit 9a6a7a2. * add dev cli websocket debug logs, always print errors * actually start the ping service * update changeset * catch remaining dev cli message handler errors
1 parent e417aca commit dbda820

File tree

9 files changed

+276
-57
lines changed

9 files changed

+276
-57
lines changed

.changeset/curly-monkeys-tell.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
- Prevent uncaught exceptions when handling WebSocket messages
7+
- Improve CLI dev command WebSocket debug and error logging

apps/webapp/app/v3/authenticatedSocketConnection.server.ts

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ import { clientWebsocketMessages, serverWebsocketMessages } from "@trigger.dev/c
22
import { ZodMessageHandler, ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler";
33
import { Evt } from "evt";
44
import { randomUUID } from "node:crypto";
5-
import type { CloseEvent, ErrorEvent, MessageEvent, WebSocket } from "ws";
5+
import type { CloseEvent, ErrorEvent, MessageEvent } from "ws";
6+
import { WebSocket } from "ws";
67
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
78
import { logger } from "~/services/logger.server";
89
import { DevQueueConsumer } from "./marqs/devQueueConsumer.server";
10+
import { HeartbeatService } from "./services/heartbeatService.server";
911

1012
export class AuthenticatedSocketConnection {
1113
public id: string;
@@ -14,6 +16,7 @@ export class AuthenticatedSocketConnection {
1416
private _sender: ZodMessageSender<typeof serverWebsocketMessages>;
1517
private _consumer: DevQueueConsumer;
1618
private _messageHandler: ZodMessageHandler<typeof clientWebsocketMessages>;
19+
private _pingService: HeartbeatService;
1720

1821
constructor(
1922
public ws: WebSocket,
@@ -50,8 +53,42 @@ export class AuthenticatedSocketConnection {
5053
ws.addEventListener("close", this.#handleClose.bind(this));
5154
ws.addEventListener("error", this.#handleError.bind(this));
5255

56+
ws.on("ping", (data) => {
57+
logger.debug("[AuthenticatedSocketConnection] Received ping", {
58+
id: this.id,
59+
envId: this.authenticatedEnv.id,
60+
data,
61+
});
62+
});
63+
64+
ws.on("pong", (data) => {
65+
logger.debug("[AuthenticatedSocketConnection] Received pong", {
66+
id: this.id,
67+
envId: this.authenticatedEnv.id,
68+
data,
69+
});
70+
});
71+
72+
this._pingService = new HeartbeatService({
73+
heartbeat: async () => {
74+
if (ws.readyState !== WebSocket.OPEN) {
75+
logger.debug("[AuthenticatedSocketConnection] Websocket not open, skipping ping");
76+
return;
77+
}
78+
79+
logger.debug("[AuthenticatedSocketConnection] Sending ping", {
80+
id: this.id,
81+
envId: this.authenticatedEnv.id,
82+
});
83+
84+
ws.ping();
85+
},
86+
});
87+
this._pingService.start();
88+
5389
this._messageHandler = new ZodMessageHandler({
5490
schema: clientWebsocketMessages,
91+
logger,
5592
messages: {
5693
READY_FOR_TASKS: async (payload) => {
5794
await this._consumer.registerBackgroundWorker(
@@ -99,14 +136,29 @@ export class AuthenticatedSocketConnection {
99136
}
100137

101138
async #handleMessage(ev: MessageEvent) {
102-
const data = JSON.parse(ev.data.toString());
103-
104-
await this._messageHandler.handleMessage(data);
139+
try {
140+
const data = JSON.parse(ev.data.toString());
141+
142+
await this._messageHandler.handleMessage(data);
143+
} catch (error) {
144+
logger.error("[AuthenticatedSocketConnection] Failed to handle message", {
145+
error:
146+
error instanceof Error
147+
? {
148+
message: error.message,
149+
stack: error.stack,
150+
}
151+
: error,
152+
message: ev.data.toString(),
153+
});
154+
}
105155
}
106156

107157
async #handleClose(ev: CloseEvent) {
108158
logger.debug("[AuthenticatedSocketConnection] Websocket closed", { ev });
109159

160+
this._pingService.stop();
161+
110162
await this._consumer.stop();
111163

112164
const result = this.onClose.post(ev);
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
type HeartbeatServiceOptions = {
2+
heartbeat: () => Promise<void>;
3+
pingIntervalInMs?: number;
4+
leadingEdge?: boolean;
5+
};
6+
7+
export class HeartbeatService {
8+
private _heartbeat: () => Promise<void>;
9+
private _heartbeatIntervalInMs: number;
10+
private _nextHeartbeat: NodeJS.Timeout | undefined;
11+
private _leadingEdge: boolean;
12+
13+
constructor(opts: HeartbeatServiceOptions) {
14+
this._heartbeat = opts.heartbeat;
15+
this._heartbeatIntervalInMs = opts.pingIntervalInMs ?? 45_000;
16+
this._nextHeartbeat = undefined;
17+
this._leadingEdge = opts.leadingEdge ?? false;
18+
}
19+
20+
start() {
21+
if (this._leadingEdge) {
22+
this.#doHeartbeat();
23+
} else {
24+
this.#scheduleNextHeartbeat();
25+
}
26+
}
27+
28+
stop() {
29+
this.#clearNextHeartbeat();
30+
}
31+
32+
#doHeartbeat = async () => {
33+
this.#clearNextHeartbeat();
34+
35+
await this._heartbeat();
36+
37+
this.#scheduleNextHeartbeat();
38+
};
39+
40+
#clearNextHeartbeat() {
41+
if (this._nextHeartbeat) {
42+
clearTimeout(this._nextHeartbeat);
43+
}
44+
}
45+
46+
#scheduleNextHeartbeat() {
47+
this._nextHeartbeat = setTimeout(this.#doHeartbeat, this._heartbeatIntervalInMs);
48+
}
49+
}

apps/webapp/app/v3/sharedSocketConnection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ export class SharedSocketConnection {
9696

9797
this._messageHandler = new ZodMessageHandler({
9898
schema: clientWebsocketMessages,
99+
logger,
99100
messages: {
100101
READY_FOR_TASKS: async (payload) => {
101102
this._sharedQueueConsumerPool.start();

apps/webapp/app/v3/utils/zodPubSub.server.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
3939
this._subscriber = new Redis(_options.redis);
4040
this._messageHandler = new ZodMessageHandler({
4141
schema: _options.schema,
42+
logger: this._logger,
4243
});
4344
}
4445

@@ -76,20 +77,26 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
7677

7778
const message = this._messageHandler.parseMessage(parsedMessage);
7879

79-
if (typeof message.type !== "string") {
80+
if (!message.success) {
81+
this._logger.error(`Failed to parse message: ${message.error}`, { parsedMessage });
8082
return;
8183
}
8284

83-
const listener = this._listeners.get(message.type);
85+
if (typeof message.data.type !== "string") {
86+
this._logger.error(`Failed to parse message: invalid type`, { parsedMessage });
87+
return;
88+
}
89+
90+
const listener = this._listeners.get(message.data.type);
8491

8592
if (!listener) {
86-
this._logger.debug(`No listener for message type: ${message.type}`, { parsedMessage });
93+
this._logger.debug(`No listener for message type: ${message.data.type}`, { parsedMessage });
8794

8895
return;
8996
}
9097

9198
try {
92-
await listener(message.payload);
99+
await listener(message.data.payload);
93100
} catch (error) {
94101
this._logger.error("Error handling message", { error, message });
95102
}

packages/cli-v3/src/commands/dev.tsx

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -293,9 +293,18 @@ function useDev({
293293
`${dashboardUrl}/projects/v3/${config.project}`
294294
);
295295

296-
websocket.addEventListener("open", async (event) => {});
297-
websocket.addEventListener("close", (event) => {});
298-
websocket.addEventListener("error", (event) => {});
296+
websocket.addEventListener("open", async (event) => {
297+
logger.debug("WebSocket opened", { event });
298+
});
299+
300+
websocket.addEventListener("close", (event) => {
301+
logger.debug("WebSocket closed", { event });
302+
});
303+
304+
websocket.addEventListener("error", (event) => {
305+
logger.log(`${chalkError("WebSocketError:")} ${event.error.message}`);
306+
logger.debug("WebSocket error", { event, rawError: event.error });
307+
});
299308

300309
// This is the deprecated task heart beat that uses the friendly attempt ID
301310
// It will only be used if the worker does not support lazy attempts
@@ -362,31 +371,42 @@ function useDev({
362371
});
363372

364373
websocket.addEventListener("message", async (event) => {
365-
const data = JSON.parse(
366-
typeof event.data === "string" ? event.data : new TextDecoder("utf-8").decode(event.data)
367-
);
374+
try {
375+
const data = JSON.parse(
376+
typeof event.data === "string" ? event.data : new TextDecoder("utf-8").decode(event.data)
377+
);
368378

369-
const messageHandler = new ZodMessageHandler({
370-
schema: serverWebsocketMessages,
371-
messages: {
372-
SERVER_READY: async (payload) => {
373-
for (const worker of backgroundWorkerCoordinator.currentWorkers) {
374-
await sender.send("READY_FOR_TASKS", {
375-
backgroundWorkerId: worker.id,
376-
inProgressRuns: worker.worker.inProgressRuns,
377-
});
378-
}
379-
},
380-
BACKGROUND_WORKER_MESSAGE: async (payload) => {
381-
await backgroundWorkerCoordinator.handleMessage(
382-
payload.backgroundWorkerId,
383-
payload.data
384-
);
379+
const messageHandler = new ZodMessageHandler({
380+
schema: serverWebsocketMessages,
381+
messages: {
382+
SERVER_READY: async (payload) => {
383+
for (const worker of backgroundWorkerCoordinator.currentWorkers) {
384+
await sender.send("READY_FOR_TASKS", {
385+
backgroundWorkerId: worker.id,
386+
inProgressRuns: worker.worker.inProgressRuns,
387+
});
388+
}
389+
},
390+
BACKGROUND_WORKER_MESSAGE: async (payload) => {
391+
await backgroundWorkerCoordinator.handleMessage(
392+
payload.backgroundWorkerId,
393+
payload.data
394+
);
395+
},
385396
},
386-
},
387-
});
397+
});
388398

389-
await messageHandler.handleMessage(data);
399+
await messageHandler.handleMessage(data);
400+
} catch (error) {
401+
if (error instanceof Error) {
402+
logger.error("Error while handling websocket message", { error: error.message });
403+
} else {
404+
logger.error(
405+
"Unkown error while handling websocket message, use `-l debug` for additional output"
406+
);
407+
logger.debug("Error while handling websocket message", { error });
408+
}
409+
}
390410
});
391411

392412
let ctx: BuildContext | undefined;

packages/cli-v3/src/workers/dev/backgroundWorker.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -379,20 +379,32 @@ export class BackgroundWorker {
379379
child.on("message", async (msg: any) => {
380380
const message = this._handler.parseMessage(msg);
381381

382-
if (message.type === "TASKS_READY" && !resolved) {
382+
if (!message.success) {
383383
clearTimeout(timeout);
384384
resolved = true;
385-
resolve(message.payload.tasks);
385+
reject(new Error(`Failed to parse message: ${message.error}`));
386386
child.kill();
387-
} else if (message.type === "UNCAUGHT_EXCEPTION") {
387+
return;
388+
}
389+
390+
if (message.data.type === "TASKS_READY" && !resolved) {
391+
clearTimeout(timeout);
392+
resolved = true;
393+
resolve(message.data.payload.tasks);
394+
child.kill();
395+
} else if (message.data.type === "UNCAUGHT_EXCEPTION") {
388396
clearTimeout(timeout);
389397
resolved = true;
390-
reject(new UncaughtExceptionError(message.payload.error, message.payload.origin));
398+
reject(
399+
new UncaughtExceptionError(message.data.payload.error, message.data.payload.origin)
400+
);
391401
child.kill();
392-
} else if (message.type === "TASKS_FAILED_TO_PARSE") {
402+
} else if (message.data.type === "TASKS_FAILED_TO_PARSE") {
393403
clearTimeout(timeout);
394404
resolved = true;
395-
reject(new TaskMetadataParseError(message.payload.zodIssues, message.payload.tasks));
405+
reject(
406+
new TaskMetadataParseError(message.data.payload.zodIssues, message.data.payload.tasks)
407+
);
396408
child.kill();
397409
}
398410
});
@@ -942,9 +954,14 @@ class TaskRunProcess {
942954
async #handleMessage(msg: any) {
943955
const message = this._handler.parseMessage(msg);
944956

945-
switch (message.type) {
957+
if (!message.success) {
958+
logger.error(`Dropping message: ${message.error}`, { message });
959+
return;
960+
}
961+
962+
switch (message.data.type) {
946963
case "TASK_RUN_COMPLETED": {
947-
const { result, execution } = message.payload;
964+
const { result, execution } = message.data.payload;
948965

949966
logger.debug(`[${this.runId}] task run completed`, {
950967
result,
@@ -981,7 +998,7 @@ class TaskRunProcess {
981998
if (this.messageId) {
982999
this.onTaskRunHeartbeat.post(this.messageId);
9831000
} else {
984-
this.onTaskHeartbeat.post(message.payload.id);
1001+
this.onTaskHeartbeat.post(message.data.payload.id);
9851002
}
9861003

9871004
break;

packages/core/src/v3/schemas/messages.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,11 @@ export const ClientToSharedQueueMessages = {
621621
data: BackgroundWorkerClientMessages,
622622
}),
623623
},
624+
PING: {
625+
message: z.object({
626+
version: z.literal("v1").default("v1"),
627+
}),
628+
},
624629
};
625630

626631
export const SharedQueueToClientMessages = {

0 commit comments

Comments
 (0)