Skip to content

Commit e04d448

Browse files
committed
v3: better handle null unicode characters when creating task events and completing a failed attempt
1 parent d48d9a0 commit e04d448

File tree

9 files changed

+180
-18
lines changed

9 files changed

+180
-18
lines changed

.changeset/shaggy-weeks-live.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
v3: sanitize errors with null unicode characters in some places

apps/webapp/app/entry.server.tsx

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,30 @@ function logError(error: unknown, request?: Request) {
179179
}
180180
}
181181

182+
process.on("uncaughtException", (error, origin) => {
183+
if (
184+
error instanceof Prisma.PrismaClientKnownRequestError ||
185+
error instanceof Prisma.PrismaClientUnknownRequestError
186+
) {
187+
// Don't exit the process if the error is a Prisma error
188+
logger.error("uncaughtException prisma error", {
189+
error,
190+
prismaMessage: error.message,
191+
code: "code" in error ? error.code : undefined,
192+
meta: "meta" in error ? error.meta : undefined,
193+
stack: error.stack,
194+
origin,
195+
});
196+
} else {
197+
logger.error("uncaughtException", {
198+
error: { name: error.name, message: error.message, stack: error.stack },
199+
origin,
200+
});
201+
}
202+
203+
process.exit(1);
204+
});
205+
182206
const sqsEventConsumer = singleton("sqsEventConsumer", getSharedSqsEventConsumer);
183207

184208
export { apiRateLimiter } from "./services/apiRateLimit.server";
@@ -188,6 +212,8 @@ export { registryProxy } from "./v3/registryProxy.server";
188212
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
189213
import { eventLoopMonitor } from "./eventLoopMonitor.server";
190214
import { env } from "./env.server";
215+
import { logger } from "./services/logger.server";
216+
import { Prisma } from "./db.server";
191217

192218
if (env.EVENT_LOOP_MONITOR_ENABLED === "1") {
193219
eventLoopMonitor.enable();

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 74 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ import { singleton } from "~/utils/singleton";
3333
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
3434
import { startActiveSpan } from "./tracer.server";
3535

36+
const MAX_FLUSH_DEPTH = 5;
37+
3638
export type CreatableEvent = Omit<
3739
Prisma.TaskEventCreateInput,
3840
"id" | "createdAt" | "properties" | "metadata" | "style" | "output" | "payload"
@@ -1009,11 +1011,79 @@ export class EventRepository {
10091011
async #flushBatch(batch: CreatableEvent[]) {
10101012
const events = excludePartialEventsWithCorrespondingFullEvent(batch);
10111013

1012-
await this.db.taskEvent.createMany({
1013-
data: events as Prisma.TaskEventCreateManyInput[],
1014-
});
1014+
const flushedEvents = await this.#doFlushBatch(events);
1015+
1016+
if (flushedEvents.length !== events.length) {
1017+
logger.debug("[EventRepository][flushBatch] Failed to insert all events", {
1018+
attemptCount: events.length,
1019+
successCount: flushedEvents.length,
1020+
});
1021+
}
1022+
1023+
this.#publishToRedis(flushedEvents);
1024+
}
10151025

1016-
this.#publishToRedis(events);
1026+
async #doFlushBatch(events: CreatableEvent[], depth: number = 1): Promise<CreatableEvent[]> {
1027+
try {
1028+
await this.db.taskEvent.createMany({
1029+
data: events as Prisma.TaskEventCreateManyInput[],
1030+
});
1031+
1032+
return events;
1033+
} catch (error) {
1034+
if (error instanceof Prisma.PrismaClientUnknownRequestError) {
1035+
logger.error("Failed to insert events, most likely because of null characters", {
1036+
error: {
1037+
name: error.name,
1038+
message: error.message,
1039+
stack: error.stack,
1040+
clientVersion: error.clientVersion,
1041+
},
1042+
});
1043+
1044+
if (events.length === 1) {
1045+
logger.debug("Attempting to insert event individually and it failed", {
1046+
event: events[0],
1047+
error: {
1048+
name: error.name,
1049+
message: error.message,
1050+
stack: error.stack,
1051+
clientVersion: error.clientVersion,
1052+
},
1053+
});
1054+
1055+
return [];
1056+
}
1057+
1058+
if (depth > MAX_FLUSH_DEPTH) {
1059+
logger.error("Failed to insert events, reached maximum depth", {
1060+
error: {
1061+
name: error.name,
1062+
message: error.message,
1063+
stack: error.stack,
1064+
clientVersion: error.clientVersion,
1065+
},
1066+
depth,
1067+
eventsCount: events.length,
1068+
});
1069+
1070+
return [];
1071+
}
1072+
1073+
// Split the events into two batches, and recursively try to insert them.
1074+
const middle = Math.floor(events.length / 2);
1075+
const [firstHalf, secondHalf] = [events.slice(0, middle), events.slice(middle)];
1076+
1077+
const [firstHalfEvents, secondHalfEvents] = await Promise.all([
1078+
this.#doFlushBatch(firstHalf, depth + 1),
1079+
this.#doFlushBatch(secondHalf, depth + 1),
1080+
]);
1081+
1082+
return firstHalfEvents.concat(secondHalfEvents);
1083+
}
1084+
1085+
throw error;
1086+
}
10171087
}
10181088

10191089
async #publishToRedis(events: CreatableEvent[]) {

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
TaskRunFailedExecutionResult,
77
TaskRunSuccessfulExecutionResult,
88
flattenAttributes,
9+
sanitizeError,
910
} from "@trigger.dev/core/v3";
1011
import { PrismaClientOrTransaction } from "~/db.server";
1112
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
@@ -166,12 +167,14 @@ export class CompleteAttemptService extends BaseService {
166167
return "COMPLETED";
167168
}
168169

170+
const sanitizedError = sanitizeError(completion.error);
171+
169172
await this._prisma.taskRunAttempt.update({
170173
where: { id: taskRunAttempt.id },
171174
data: {
172175
status: "FAILED",
173176
completedAt: new Date(),
174-
error: completion.error,
177+
error: sanitizedError,
175178
usageDurationMs: completion.usage?.durationMs,
176179
},
177180
});
@@ -289,15 +292,15 @@ export class CompleteAttemptService extends BaseService {
289292
name: "exception",
290293
time: new Date(),
291294
properties: {
292-
exception: createExceptionPropertiesFromError(completion.error),
295+
exception: createExceptionPropertiesFromError(sanitizedError),
293296
},
294297
},
295298
],
296299
});
297300

298301
if (
299-
completion.error.type === "INTERNAL_ERROR" &&
300-
completion.error.code === "GRACEFUL_EXIT_TIMEOUT"
302+
sanitizedError.type === "INTERNAL_ERROR" &&
303+
sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT"
301304
) {
302305
// We need to fail all incomplete spans
303306
const inProgressEvents = await eventRepository.queryIncompleteEvents({
@@ -310,7 +313,7 @@ export class CompleteAttemptService extends BaseService {
310313

311314
const exception = {
312315
type: "Graceful exit timeout",
313-
message: completion.error.message,
316+
message: sanitizedError.message,
314317
};
315318

316319
await Promise.all(

packages/core/src/v3/errors.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,40 @@ export function createJsonErrorObject(error: TaskRunError): SerializedError {
9595
}
9696
}
9797

98+
// Removes any null characters from the error message
99+
export function sanitizeError(error: TaskRunError): TaskRunError {
100+
switch (error.type) {
101+
case "BUILT_IN_ERROR": {
102+
return {
103+
type: "BUILT_IN_ERROR",
104+
message: error.message?.replace(/\0/g, ""),
105+
name: error.name?.replace(/\0/g, ""),
106+
stackTrace: error.stackTrace?.replace(/\0/g, ""),
107+
};
108+
}
109+
case "STRING_ERROR": {
110+
return {
111+
type: "STRING_ERROR",
112+
raw: error.raw.replace(/\0/g, ""),
113+
};
114+
}
115+
case "CUSTOM_ERROR": {
116+
return {
117+
type: "CUSTOM_ERROR",
118+
raw: error.raw.replace(/\0/g, ""),
119+
};
120+
}
121+
case "INTERNAL_ERROR": {
122+
return {
123+
type: "INTERNAL_ERROR",
124+
code: error.code,
125+
message: error.message?.replace(/\0/g, ""),
126+
stackTrace: error.stackTrace?.replace(/\0/g, ""),
127+
};
128+
}
129+
}
130+
}
131+
98132
export function correctErrorStackTrace(
99133
stackTrace: string,
100134
projectDir?: string,

packages/core/src/v3/otel/index.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,21 @@ export { TracingSDK, type TracingSDKConfig, type TracingDiagnosticLogLevel } fro
44

55
export function recordSpanException(span: Span, error: unknown) {
66
if (error instanceof Error) {
7-
span.recordException(error);
7+
span.recordException(sanitizeSpanError(error));
88
} else if (typeof error === "string") {
9-
span.recordException(new Error(error));
9+
span.recordException(error.replace(/\0/g, ""));
1010
} else {
11-
span.recordException(new Error(JSON.stringify(error)));
11+
span.recordException(JSON.stringify(error).replace(/\0/g, ""));
1212
}
1313

1414
span.setStatus({ code: SpanStatusCode.ERROR });
1515
}
16+
17+
function sanitizeSpanError(error: Error) {
18+
// Create a new error object with the same name, message and stack trace
19+
const sanitizedError = new Error(error.message.replace(/\0/g, ""));
20+
sanitizedError.name = error.name.replace(/\0/g, "");
21+
sanitizedError.stack = error.stack?.replace(/\0/g, "");
22+
23+
return sanitizedError;
24+
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { SpanKind } from "@opentelemetry/api";
22
import { ConsoleInterceptor } from "../consoleInterceptor";
3-
import { parseError } from "../errors";
3+
import { parseError, sanitizeError } from "../errors";
44
import { TracingSDK, recordSpanException } from "../otel";
55
import {
66
BackgroundWorkerProperties,
@@ -169,9 +169,11 @@ export class TaskExecutor {
169169
return {
170170
id: execution.run.id,
171171
ok: false,
172-
error: handleErrorResult.error
173-
? parseError(handleErrorResult.error)
174-
: parseError(runError),
172+
error: sanitizeError(
173+
handleErrorResult.error
174+
? parseError(handleErrorResult.error)
175+
: parseError(runError)
176+
),
175177
retry: handleErrorResult.status === "retry" ? handleErrorResult.retry : undefined,
176178
skippedRetrying: handleErrorResult.status === "skipped",
177179
} satisfies TaskRunExecutionResult;

references/v3-catalog/src/trigger/other.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,18 @@ export const oomTask = task({
107107
}
108108
},
109109
});
110+
111+
export const returnZeroCharacters = task({
112+
id: "return-zero-characters",
113+
run: async (payload: { forceError?: boolean }) => {
114+
if (payload.forceError) {
115+
throw new Error("All zeros: \u0000\x00\0");
116+
}
117+
118+
return {
119+
unicode: "\u0000",
120+
hex: "\x00",
121+
octal: "\0",
122+
};
123+
},
124+
});

references/v3-catalog/trigger.config.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,5 @@ export const config: TriggerConfig = {
6161
},
6262
onFailure: async (payload, error, { ctx }) => {
6363
console.log(`Task ${ctx.task.id} failed ${ctx.run.id}`);
64-
65-
throw error;
6664
},
6765
};

0 commit comments

Comments
 (0)