Skip to content

Commit 84a0336

Browse files
authored
Fix #1955: schemaTask parse payload failures handled gracefully (#2043)
1 parent 7be07b4 commit 84a0336

File tree

2 files changed

+58
-3
lines changed

2 files changed

+58
-3
lines changed

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,21 @@ export class TaskExecutor {
143143
);
144144
}
145145

146-
parsedPayload = await this.#parsePayload(payloadResult);
146+
const [parsePayloadError, parsedPayloadResult] = await tryCatch(
147+
this.#parsePayload(payloadResult)
148+
);
149+
150+
if (parsePayloadError) {
151+
recordSpanException(span, parsePayloadError);
152+
return this.#internalErrorResult(
153+
execution,
154+
TaskRunErrorCodes.TASK_INPUT_ERROR,
155+
parsePayloadError,
156+
true
157+
);
158+
}
159+
160+
parsedPayload = parsedPayloadResult;
147161

148162
lifecycleHooks.registerOnWaitHookListener(async (wait) => {
149163
await this.#callOnWaitFunctions(wait, parsedPayload, ctx, initOutput, signal);
@@ -1369,7 +1383,12 @@ export class TaskExecutor {
13691383
});
13701384
}
13711385

1372-
#internalErrorResult(execution: TaskRunExecution, code: TaskRunErrorCodes, error: unknown) {
1386+
#internalErrorResult(
1387+
execution: TaskRunExecution,
1388+
code: TaskRunErrorCodes,
1389+
error: unknown,
1390+
skippedRetrying?: boolean
1391+
) {
13731392
return {
13741393
ok: false,
13751394
id: execution.run.id,
@@ -1384,6 +1403,7 @@ export class TaskExecutor {
13841403
: undefined,
13851404
stackTrace: error instanceof Error ? error.stack : undefined,
13861405
},
1406+
skippedRetrying,
13871407
} satisfies TaskRunExecutionResult;
13881408
}
13891409

packages/core/test/taskExecutor.test.ts

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1809,6 +1809,37 @@ describe("TaskExecutor", () => {
18091809
expect(delay).toBeGreaterThan(29900); // Allow for some time passing during test
18101810
expect(delay).toBeLessThan(32000); // Account for max 2000ms jitter
18111811
});
1812+
1813+
test("should return error and skip retrying if parsePayload throws", async () => {
1814+
const parseError = new Error("Parse failed");
1815+
const task = {
1816+
id: "test-task",
1817+
fns: {
1818+
run: async () => {
1819+
throw new Error("Should not reach run");
1820+
},
1821+
parsePayload: async () => {
1822+
throw parseError;
1823+
},
1824+
},
1825+
};
1826+
1827+
const result = await executeTask(task, { foo: "bar" }, undefined);
1828+
1829+
expect(result).toEqual({
1830+
result: {
1831+
ok: false,
1832+
id: "test-run-id",
1833+
error: {
1834+
type: "INTERNAL_ERROR",
1835+
code: TaskRunErrorCodes.TASK_INPUT_ERROR,
1836+
message: "TaskPayloadParsedError: Parsing payload with schema failed: Parse failed",
1837+
stackTrace: expect.any(String),
1838+
},
1839+
skippedRetrying: true,
1840+
},
1841+
});
1842+
});
18121843
});
18131844

18141845
function executeTask(
@@ -1828,7 +1859,11 @@ function executeTask(
18281859
logger: tracingSDK.getLogger("test-task"),
18291860
});
18301861

1831-
const consoleInterceptor = new ConsoleInterceptor(tracingSDK.getLogger("test-task"), false);
1862+
const consoleInterceptor = new ConsoleInterceptor(
1863+
tracingSDK.getLogger("test-task"),
1864+
false,
1865+
false
1866+
);
18321867

18331868
const executor = new TaskExecutor(task, {
18341869
tracingSDK,

0 commit comments

Comments
 (0)