Skip to content

v3: fix websocket timeouts #1220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Jul 17, 2024
7 changes: 7 additions & 0 deletions .changeset/curly-monkeys-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"trigger.dev": patch
"@trigger.dev/core": patch
---

- Prevent uncaught exceptions when handling WebSocket messages
- Improve CLI dev command WebSocket debug and error logging
60 changes: 56 additions & 4 deletions apps/webapp/app/v3/authenticatedSocketConnection.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { clientWebsocketMessages, serverWebsocketMessages } from "@trigger.dev/c
import { ZodMessageHandler, ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler";
import { Evt } from "evt";
import { randomUUID } from "node:crypto";
import type { CloseEvent, ErrorEvent, MessageEvent, WebSocket } from "ws";
import type { CloseEvent, ErrorEvent, MessageEvent } from "ws";
import { WebSocket } from "ws";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { DevQueueConsumer } from "./marqs/devQueueConsumer.server";
import { HeartbeatService } from "./services/heartbeatService.server";

export class AuthenticatedSocketConnection {
public id: string;
Expand All @@ -14,6 +16,7 @@ export class AuthenticatedSocketConnection {
private _sender: ZodMessageSender<typeof serverWebsocketMessages>;
private _consumer: DevQueueConsumer;
private _messageHandler: ZodMessageHandler<typeof clientWebsocketMessages>;
private _pingService: HeartbeatService;

constructor(
public ws: WebSocket,
Expand Down Expand Up @@ -50,8 +53,42 @@ export class AuthenticatedSocketConnection {
ws.addEventListener("close", this.#handleClose.bind(this));
ws.addEventListener("error", this.#handleError.bind(this));

ws.on("ping", (data) => {
logger.debug("[AuthenticatedSocketConnection] Received ping", {
id: this.id,
envId: this.authenticatedEnv.id,
data,
});
});

ws.on("pong", (data) => {
logger.debug("[AuthenticatedSocketConnection] Received pong", {
id: this.id,
envId: this.authenticatedEnv.id,
data,
});
});

this._pingService = new HeartbeatService({
heartbeat: async () => {
if (ws.readyState !== WebSocket.OPEN) {
logger.debug("[AuthenticatedSocketConnection] Websocket not open, skipping ping");
return;
}

logger.debug("[AuthenticatedSocketConnection] Sending ping", {
id: this.id,
envId: this.authenticatedEnv.id,
});

ws.ping();
},
});
this._pingService.start();

this._messageHandler = new ZodMessageHandler({
schema: clientWebsocketMessages,
logger,
messages: {
READY_FOR_TASKS: async (payload) => {
await this._consumer.registerBackgroundWorker(
Expand Down Expand Up @@ -99,14 +136,29 @@ export class AuthenticatedSocketConnection {
}

async #handleMessage(ev: MessageEvent) {
const data = JSON.parse(ev.data.toString());

await this._messageHandler.handleMessage(data);
try {
const data = JSON.parse(ev.data.toString());

await this._messageHandler.handleMessage(data);
} catch (error) {
logger.error("[AuthenticatedSocketConnection] Failed to handle message", {
error:
error instanceof Error
? {
message: error.message,
stack: error.stack,
}
: error,
message: ev.data.toString(),
});
}
}

async #handleClose(ev: CloseEvent) {
logger.debug("[AuthenticatedSocketConnection] Websocket closed", { ev });

this._pingService.stop();

await this._consumer.stop();

const result = this.onClose.post(ev);
Expand Down
49 changes: 49 additions & 0 deletions apps/webapp/app/v3/services/heartbeatService.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
type HeartbeatServiceOptions = {
heartbeat: () => Promise<void>;
pingIntervalInMs?: number;
leadingEdge?: boolean;
};

export class HeartbeatService {
private _heartbeat: () => Promise<void>;
private _heartbeatIntervalInMs: number;
private _nextHeartbeat: NodeJS.Timeout | undefined;
private _leadingEdge: boolean;

constructor(opts: HeartbeatServiceOptions) {
this._heartbeat = opts.heartbeat;
this._heartbeatIntervalInMs = opts.pingIntervalInMs ?? 45_000;
this._nextHeartbeat = undefined;
this._leadingEdge = opts.leadingEdge ?? false;
}

start() {
if (this._leadingEdge) {
this.#doHeartbeat();
} else {
this.#scheduleNextHeartbeat();
}
}

stop() {
this.#clearNextHeartbeat();
}

#doHeartbeat = async () => {
this.#clearNextHeartbeat();

await this._heartbeat();

this.#scheduleNextHeartbeat();
};

#clearNextHeartbeat() {
if (this._nextHeartbeat) {
clearTimeout(this._nextHeartbeat);
}
}

#scheduleNextHeartbeat() {
this._nextHeartbeat = setTimeout(this.#doHeartbeat, this._heartbeatIntervalInMs);
}
}
1 change: 1 addition & 0 deletions apps/webapp/app/v3/sharedSocketConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export class SharedSocketConnection {

this._messageHandler = new ZodMessageHandler({
schema: clientWebsocketMessages,
logger,
messages: {
READY_FOR_TASKS: async (payload) => {
this._sharedQueueConsumerPool.start();
Expand Down
15 changes: 11 additions & 4 deletions apps/webapp/app/v3/utils/zodPubSub.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
this._subscriber = new Redis(_options.redis);
this._messageHandler = new ZodMessageHandler({
schema: _options.schema,
logger: this._logger,
});
}

Expand Down Expand Up @@ -76,20 +77,26 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>

const message = this._messageHandler.parseMessage(parsedMessage);

if (typeof message.type !== "string") {
if (!message.success) {
this._logger.error(`Failed to parse message: ${message.error}`, { parsedMessage });
return;
}

const listener = this._listeners.get(message.type);
if (typeof message.data.type !== "string") {
this._logger.error(`Failed to parse message: invalid type`, { parsedMessage });
return;
}

const listener = this._listeners.get(message.data.type);

if (!listener) {
this._logger.debug(`No listener for message type: ${message.type}`, { parsedMessage });
this._logger.debug(`No listener for message type: ${message.data.type}`, { parsedMessage });

return;
}

try {
await listener(message.payload);
await listener(message.data.payload);
} catch (error) {
this._logger.error("Error handling message", { error, message });
}
Expand Down
70 changes: 45 additions & 25 deletions packages/cli-v3/src/commands/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,18 @@ function useDev({
`${dashboardUrl}/projects/v3/${config.project}`
);

websocket.addEventListener("open", async (event) => {});
websocket.addEventListener("close", (event) => {});
websocket.addEventListener("error", (event) => {});
websocket.addEventListener("open", async (event) => {
logger.debug("WebSocket opened", { event });
});

websocket.addEventListener("close", (event) => {
logger.debug("WebSocket closed", { event });
});

websocket.addEventListener("error", (event) => {
logger.log(`${chalkError("WebSocketError:")} ${event.error.message}`);
logger.debug("WebSocket error", { event, rawError: event.error });
});

// This is the deprecated task heart beat that uses the friendly attempt ID
// It will only be used if the worker does not support lazy attempts
Expand Down Expand Up @@ -362,31 +371,42 @@ function useDev({
});

websocket.addEventListener("message", async (event) => {
const data = JSON.parse(
typeof event.data === "string" ? event.data : new TextDecoder("utf-8").decode(event.data)
);
try {
const data = JSON.parse(
typeof event.data === "string" ? event.data : new TextDecoder("utf-8").decode(event.data)
);

const messageHandler = new ZodMessageHandler({
schema: serverWebsocketMessages,
messages: {
SERVER_READY: async (payload) => {
for (const worker of backgroundWorkerCoordinator.currentWorkers) {
await sender.send("READY_FOR_TASKS", {
backgroundWorkerId: worker.id,
inProgressRuns: worker.worker.inProgressRuns,
});
}
},
BACKGROUND_WORKER_MESSAGE: async (payload) => {
await backgroundWorkerCoordinator.handleMessage(
payload.backgroundWorkerId,
payload.data
);
const messageHandler = new ZodMessageHandler({
schema: serverWebsocketMessages,
messages: {
SERVER_READY: async (payload) => {
for (const worker of backgroundWorkerCoordinator.currentWorkers) {
await sender.send("READY_FOR_TASKS", {
backgroundWorkerId: worker.id,
inProgressRuns: worker.worker.inProgressRuns,
});
}
},
BACKGROUND_WORKER_MESSAGE: async (payload) => {
await backgroundWorkerCoordinator.handleMessage(
payload.backgroundWorkerId,
payload.data
);
},
},
},
});
});

await messageHandler.handleMessage(data);
await messageHandler.handleMessage(data);
} catch (error) {
if (error instanceof Error) {
logger.error("Error while handling websocket message", { error: error.message });
} else {
logger.error(
"Unkown error while handling websocket message, use `-l debug` for additional output"
);
logger.debug("Error while handling websocket message", { error });
}
}
});

let ctx: BuildContext | undefined;
Expand Down
35 changes: 26 additions & 9 deletions packages/cli-v3/src/workers/dev/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -379,20 +379,32 @@ export class BackgroundWorker {
child.on("message", async (msg: any) => {
const message = this._handler.parseMessage(msg);

if (message.type === "TASKS_READY" && !resolved) {
if (!message.success) {
clearTimeout(timeout);
resolved = true;
resolve(message.payload.tasks);
reject(new Error(`Failed to parse message: ${message.error}`));
child.kill();
} else if (message.type === "UNCAUGHT_EXCEPTION") {
return;
}

if (message.data.type === "TASKS_READY" && !resolved) {
clearTimeout(timeout);
resolved = true;
resolve(message.data.payload.tasks);
child.kill();
} else if (message.data.type === "UNCAUGHT_EXCEPTION") {
clearTimeout(timeout);
resolved = true;
reject(new UncaughtExceptionError(message.payload.error, message.payload.origin));
reject(
new UncaughtExceptionError(message.data.payload.error, message.data.payload.origin)
);
child.kill();
} else if (message.type === "TASKS_FAILED_TO_PARSE") {
} else if (message.data.type === "TASKS_FAILED_TO_PARSE") {
clearTimeout(timeout);
resolved = true;
reject(new TaskMetadataParseError(message.payload.zodIssues, message.payload.tasks));
reject(
new TaskMetadataParseError(message.data.payload.zodIssues, message.data.payload.tasks)
);
child.kill();
}
});
Expand Down Expand Up @@ -942,9 +954,14 @@ class TaskRunProcess {
async #handleMessage(msg: any) {
const message = this._handler.parseMessage(msg);

switch (message.type) {
if (!message.success) {
logger.error(`Dropping message: ${message.error}`, { message });
return;
}

switch (message.data.type) {
case "TASK_RUN_COMPLETED": {
const { result, execution } = message.payload;
const { result, execution } = message.data.payload;

logger.debug(`[${this.runId}] task run completed`, {
result,
Expand Down Expand Up @@ -981,7 +998,7 @@ class TaskRunProcess {
if (this.messageId) {
this.onTaskRunHeartbeat.post(this.messageId);
} else {
this.onTaskHeartbeat.post(message.payload.id);
this.onTaskHeartbeat.post(message.data.payload.id);
}

break;
Expand Down
5 changes: 5 additions & 0 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,11 @@ export const ClientToSharedQueueMessages = {
data: BackgroundWorkerClientMessages,
}),
},
PING: {
message: z.object({
version: z.literal("v1").default("v1"),
}),
},
};

export const SharedQueueToClientMessages = {
Expand Down
Loading
Loading