Skip to content

Commit a974e8d

Browse files
committed
prevent uncaught websocket exceptions
1 parent 9971de6 commit a974e8d

File tree

5 files changed

+131
-31
lines changed

5 files changed

+131
-31
lines changed

apps/webapp/app/v3/authenticatedSocketConnection.server.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ export class AuthenticatedSocketConnection {
5252

5353
this._messageHandler = new ZodMessageHandler({
5454
schema: clientWebsocketMessages,
55+
logger,
5556
messages: {
5657
READY_FOR_TASKS: async (payload) => {
5758
await this._consumer.registerBackgroundWorker(
@@ -99,9 +100,22 @@ export class AuthenticatedSocketConnection {
99100
}
100101

101102
async #handleMessage(ev: MessageEvent) {
102-
const data = JSON.parse(ev.data.toString());
103-
104-
await this._messageHandler.handleMessage(data);
103+
try {
104+
const data = JSON.parse(ev.data.toString());
105+
106+
await this._messageHandler.handleMessage(data);
107+
} catch (error) {
108+
logger.error("[AuthenticatedSocketConnection] Failed to handle message", {
109+
error:
110+
error instanceof Error
111+
? {
112+
message: error.message,
113+
stack: error.stack,
114+
}
115+
: error,
116+
message: ev.data.toString(),
117+
});
118+
}
105119
}
106120

107121
async #handleClose(ev: CloseEvent) {

apps/webapp/app/v3/sharedSocketConnection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ export class SharedSocketConnection {
9696

9797
this._messageHandler = new ZodMessageHandler({
9898
schema: clientWebsocketMessages,
99+
logger,
99100
messages: {
100101
READY_FOR_TASKS: async (payload) => {
101102
this._sharedQueueConsumerPool.start();

apps/webapp/app/v3/utils/zodPubSub.server.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
3939
this._subscriber = new Redis(_options.redis);
4040
this._messageHandler = new ZodMessageHandler({
4141
schema: _options.schema,
42+
logger: this._logger,
4243
});
4344
}
4445

@@ -76,20 +77,26 @@ class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
7677

7778
const message = this._messageHandler.parseMessage(parsedMessage);
7879

79-
if (typeof message.type !== "string") {
80+
if (!message.success) {
81+
this._logger.error(`Failed to parse message: ${message.error}`, { parsedMessage });
8082
return;
8183
}
8284

83-
const listener = this._listeners.get(message.type);
85+
if (typeof message.data.type !== "string") {
86+
this._logger.error(`Failed to parse message: invalid type`, { parsedMessage });
87+
return;
88+
}
89+
90+
const listener = this._listeners.get(message.data.type);
8491

8592
if (!listener) {
86-
this._logger.debug(`No listener for message type: ${message.type}`, { parsedMessage });
93+
this._logger.debug(`No listener for message type: ${message.data.type}`, { parsedMessage });
8794

8895
return;
8996
}
9097

9198
try {
92-
await listener(message.payload);
99+
await listener(message.data.payload);
93100
} catch (error) {
94101
this._logger.error("Error handling message", { error, message });
95102
}

packages/cli-v3/src/workers/dev/backgroundWorker.ts

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -379,20 +379,32 @@ export class BackgroundWorker {
379379
child.on("message", async (msg: any) => {
380380
const message = this._handler.parseMessage(msg);
381381

382-
if (message.type === "TASKS_READY" && !resolved) {
382+
if (!message.success) {
383383
clearTimeout(timeout);
384384
resolved = true;
385-
resolve(message.payload.tasks);
385+
reject(new Error(`Failed to parse message: ${message.error}`));
386386
child.kill();
387-
} else if (message.type === "UNCAUGHT_EXCEPTION") {
387+
return;
388+
}
389+
390+
if (message.data.type === "TASKS_READY" && !resolved) {
391+
clearTimeout(timeout);
392+
resolved = true;
393+
resolve(message.data.payload.tasks);
394+
child.kill();
395+
} else if (message.data.type === "UNCAUGHT_EXCEPTION") {
388396
clearTimeout(timeout);
389397
resolved = true;
390-
reject(new UncaughtExceptionError(message.payload.error, message.payload.origin));
398+
reject(
399+
new UncaughtExceptionError(message.data.payload.error, message.data.payload.origin)
400+
);
391401
child.kill();
392-
} else if (message.type === "TASKS_FAILED_TO_PARSE") {
402+
} else if (message.data.type === "TASKS_FAILED_TO_PARSE") {
393403
clearTimeout(timeout);
394404
resolved = true;
395-
reject(new TaskMetadataParseError(message.payload.zodIssues, message.payload.tasks));
405+
reject(
406+
new TaskMetadataParseError(message.data.payload.zodIssues, message.data.payload.tasks)
407+
);
396408
child.kill();
397409
}
398410
});
@@ -942,9 +954,14 @@ class TaskRunProcess {
942954
async #handleMessage(msg: any) {
943955
const message = this._handler.parseMessage(msg);
944956

945-
switch (message.type) {
957+
if (!message.success) {
958+
logger.error(`Dropping message: ${message.error}`, { message });
959+
return;
960+
}
961+
962+
switch (message.data.type) {
946963
case "TASK_RUN_COMPLETED": {
947-
const { result, execution } = message.payload;
964+
const { result, execution } = message.data.payload;
948965

949966
logger.debug(`[${this.runId}] task run completed`, {
950967
result,
@@ -981,7 +998,7 @@ class TaskRunProcess {
981998
if (this.messageId) {
982999
this.onTaskRunHeartbeat.post(this.messageId);
9831000
} else {
984-
this.onTaskHeartbeat.post(message.payload.id);
1001+
this.onTaskHeartbeat.post(message.data.payload.id);
9851002
}
9861003

9871004
break;

packages/core/src/v3/zodMessageHandler.ts

Lines changed: 76 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export type ZodMessageHandlers<TCatalogSchema extends ZodMessageCatalogSchema> =
2525
export type ZodMessageHandlerOptions<TMessageCatalog extends ZodMessageCatalogSchema> = {
2626
schema: TMessageCatalog;
2727
messages?: ZodMessageHandlers<TMessageCatalog>;
28+
logger?: StructuredLogger;
2829
};
2930

3031
export type MessageFromSchema<
@@ -52,53 +53,107 @@ export interface EventEmitterLike {
5253
export class ZodMessageHandler<TMessageCatalog extends ZodMessageCatalogSchema> {
5354
#schema: TMessageCatalog;
5455
#handlers: ZodMessageHandlers<TMessageCatalog> | undefined;
56+
#logger: StructuredLogger | Console;
5557

5658
constructor(options: ZodMessageHandlerOptions<TMessageCatalog>) {
5759
this.#schema = options.schema;
5860
this.#handlers = options.messages;
61+
this.#logger = options.logger ?? console;
5962
}
6063

61-
public async handleMessage(message: unknown) {
64+
public async handleMessage(message: unknown): Promise<
65+
| {
66+
success: true;
67+
data: unknown;
68+
}
69+
| {
70+
success: false;
71+
error: string;
72+
}
73+
> {
6274
const parsedMessage = this.parseMessage(message);
6375

76+
if (!parsedMessage.success) {
77+
this.#logger.error(parsedMessage.error, { message });
78+
79+
return {
80+
success: false,
81+
error: parsedMessage.error,
82+
};
83+
}
84+
6485
if (!this.#handlers) {
65-
throw new Error("No handlers provided");
86+
this.#logger.error("No handlers provided", { message });
87+
88+
return {
89+
success: false,
90+
error: "No handlers provided",
91+
};
6692
}
6793

68-
const handler = this.#handlers[parsedMessage.type];
94+
const handler = this.#handlers[parsedMessage.data.type];
6995

7096
if (!handler) {
71-
console.error(`No handler for message type: ${String(parsedMessage.type)}`);
72-
return;
97+
const error = `No handler for message type: ${String(parsedMessage.data.type)}`;
98+
99+
this.#logger.error(error, { message });
100+
101+
return {
102+
success: false,
103+
error,
104+
};
73105
}
74106

75-
const ack = await handler(parsedMessage.payload);
107+
const ack = await handler(parsedMessage.data.payload);
76108

77-
return ack;
109+
return {
110+
success: true,
111+
data: ack,
112+
};
78113
}
79114

80-
public parseMessage(message: unknown): MessageFromCatalog<TMessageCatalog> {
115+
public parseMessage(message: unknown):
116+
| {
117+
success: true;
118+
data: MessageFromCatalog<TMessageCatalog>;
119+
}
120+
| {
121+
success: false;
122+
error: string;
123+
} {
81124
const parsedMessage = ZodMessageSchema.safeParse(message);
82125

83126
if (!parsedMessage.success) {
84-
throw new Error(`Failed to parse message: ${JSON.stringify(parsedMessage.error)}`);
127+
return {
128+
success: false,
129+
error: `Failed to parse message: ${JSON.stringify(parsedMessage.error)}`,
130+
};
85131
}
86132

87133
const schema = this.#schema[parsedMessage.data.type];
88134

89135
if (!schema) {
90-
throw new Error(`Unknown message type: ${parsedMessage.data.type}`);
136+
return {
137+
success: false,
138+
error: `Unknown message type: ${parsedMessage.data.type}`,
139+
};
91140
}
92141

93142
const parsedPayload = schema.safeParse(parsedMessage.data.payload);
94143

95144
if (!parsedPayload.success) {
96-
throw new Error(`Failed to parse message payload: ${JSON.stringify(parsedPayload.error)}`);
145+
return {
146+
success: false,
147+
error: `Failed to parse message payload: ${JSON.stringify(parsedPayload.error)}`,
148+
};
97149
}
98150

99151
return {
100-
type: parsedMessage.data.type,
101-
payload: parsedPayload.data,
152+
success: true,
153+
data: {
154+
type: parsedMessage.data.type,
155+
payload: parsedPayload.data,
156+
},
102157
};
103158
}
104159

@@ -117,7 +172,7 @@ export class ZodMessageHandler<TMessageCatalog extends ZodMessageCatalogSchema>
117172
hasCallback: !!callback,
118173
});
119174

120-
let ack;
175+
let ack: Awaited<ReturnType<ZodMessageHandler<TMessageCatalog>["handleMessage"]>>;
121176

122177
// FIXME: this only works if the message doesn't have genuine payload prop
123178
if ("payload" in message) {
@@ -129,7 +184,13 @@ export class ZodMessageHandler<TMessageCatalog extends ZodMessageCatalogSchema>
129184
}
130185

131186
if (callback && typeof callback === "function") {
132-
callback(ack);
187+
if (!ack.success) {
188+
// We don't know the callback type, so we can't do anything else - not all callbacks may accept a success prop
189+
log.error("Failed to handle message, skipping callback", { message, error: ack.error });
190+
return;
191+
}
192+
193+
callback(ack.data);
133194
}
134195
});
135196
}

0 commit comments

Comments
 (0)