Skip to content

Commit d7cc2c9

Browse files
authored
v3: task payload refactor and configurable default retries (#939)
* Make the payload parameter the first argument to the task functions * Allow configuring default retry behavior, and disabling in dev
1 parent 851b176 commit d7cc2c9

File tree

26 files changed

+135
-97
lines changed

26 files changed

+135
-97
lines changed

packages/cli-v3/src/commands/deploy.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -613,6 +613,7 @@ async function compileProject(config: ResolvedConfig, options: DeployCommandOpti
613613
outdir: "out",
614614
define: {
615615
TRIGGER_API_URL: `"${config.triggerUrl}"`,
616+
__PROJECT_CONFIG__: JSON.stringify(config),
616617
},
617618
});
618619

@@ -646,6 +647,9 @@ async function compileProject(config: ResolvedConfig, options: DeployCommandOpti
646647
format: "cjs", // This is needed to support opentelemetry instrumentation that uses module patching
647648
target: ["node18", "es2020"],
648649
outdir: "out",
650+
define: {
651+
__PROJECT_CONFIG__: JSON.stringify(config),
652+
},
649653
});
650654

651655
if (entryPointResult.errors.length > 0) {

packages/cli-v3/src/commands/dev.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ function useDev({
335335
outdir: "out",
336336
define: {
337337
TRIGGER_API_URL: `"${config.triggerUrl}"`,
338+
__PROJECT_CONFIG__: JSON.stringify(config),
338339
},
339340
plugins: [
340341
{
@@ -409,7 +410,7 @@ function useDev({
409410
await environmentClient.getEnvironmentVariables(config.project);
410411

411412
const backgroundWorker = new BackgroundWorker(fullPath, {
412-
projectDir: config.projectDir,
413+
projectConfig: config,
413414
dependencies,
414415
env: {
415416
TRIGGER_API_URL: apiUrl,

packages/cli-v3/src/types.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ import { TaskMetadataWithFilePath } from "@trigger.dev/core/v3";
22

33
export type TaskMetadataWithFunctions = TaskMetadataWithFilePath & {
44
fns: {
5-
run: (params: any) => Promise<any>;
6-
init?: (params: any) => Promise<void>;
7-
cleanup?: (params: any) => Promise<void>;
8-
middleware?: (params: any) => Promise<void>;
5+
run: (payload: any, params: any) => Promise<any>;
6+
init?: (payload: any, params: any) => Promise<void>;
7+
cleanup?: (payload: any, params: any) => Promise<void>;
8+
middleware?: (payload: any, params: any) => Promise<void>;
99
};
1010
};
1111

packages/cli-v3/src/workers/dev/backgroundWorker.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import {
22
BackgroundWorkerProperties,
33
BackgroundWorkerServerMessages,
44
CreateBackgroundWorkerResponse,
5+
ResolvedConfig,
56
SemanticInternalAttributes,
67
TaskMetadataWithFilePath,
78
TaskRunBuiltInError,
@@ -148,14 +149,25 @@ export class BackgroundWorkerCoordinator {
148149

149150
const elapsed = performance.now() - now;
150151

152+
const retryingText =
153+
!completion.ok && completion.skippedRetrying
154+
? " (retrying skipped)"
155+
: !completion.ok && completion.retry !== undefined
156+
? ` (retrying in ${completion.retry.delay}ms)`
157+
: "";
158+
151159
const resultText = !completion.ok
152160
? completion.error.type === "INTERNAL_ERROR" &&
153161
completion.error.code === TaskRunErrorCodes.TASK_EXECUTION_ABORTED
154162
? chalk.yellow("cancelled")
155-
: chalk.red("error")
163+
: chalk.red(`error${retryingText}`)
156164
: chalk.green("success");
157165

158-
const errorText = !completion.ok ? this.#formatErrorLog(completion.error) : "";
166+
const errorText = !completion.ok
167+
? this.#formatErrorLog(completion.error)
168+
: "retry" in completion
169+
? `retry in ${completion.retry}ms`
170+
: "";
159171

160172
const elapsedText = chalk.dim(`(${elapsed.toFixed(2)}ms)`);
161173

@@ -205,7 +217,7 @@ class CleanupProcessError extends Error {
205217
export type BackgroundWorkerParams = {
206218
env: Record<string, string>;
207219
dependencies?: Record<string, string>;
208-
projectDir: string;
220+
projectConfig: ResolvedConfig;
209221
debuggerOn: boolean;
210222
debugOtel?: boolean;
211223
};
@@ -434,7 +446,7 @@ export class BackgroundWorker {
434446
): Promise<TaskRunBuiltInError> {
435447
return {
436448
...error,
437-
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectDir),
449+
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectConfig.projectDir),
438450
};
439451
}
440452
}
@@ -478,7 +490,7 @@ class TaskRunProcess {
478490
env: {
479491
...this.env,
480492
OTEL_RESOURCE_ATTRIBUTES: JSON.stringify({
481-
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectDir,
493+
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectConfig.projectDir,
482494
}),
483495
...(this.worker.debugOtel ? { OTEL_LOG_LEVEL: "debug" } : {}),
484496
},

packages/cli-v3/src/workers/dev/worker-facade.ts

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1-
import { type TracingSDK } from "@trigger.dev/core/v3";
1+
import { Config, type TracingSDK } from "@trigger.dev/core/v3";
22
import "source-map-support/register.js";
33

44
__WORKER_SETUP__;
55
declare const __WORKER_SETUP__: unknown;
6+
7+
declare const __PROJECT_CONFIG__: Config;
68
declare const tracingSDK: TracingSDK;
79

810
const otelTracer = tracingSDK.getTracer("trigger-dev-worker", packageJson.version);
@@ -68,15 +70,23 @@ class TaskExecutor {
6870
async determineRetrying(
6971
execution: TaskRunExecution,
7072
error: unknown
71-
): Promise<TaskRunExecutionRetry | undefined> {
72-
if (!this.task.retry) {
73+
): Promise<TaskRunExecutionRetry | "skipped" | undefined> {
74+
const retry = this.task.retry ?? __PROJECT_CONFIG__.retries?.default;
75+
76+
if (!retry) {
7377
return;
7478
}
7579

76-
const retry = this.task.retry;
77-
7880
const delay = calculateNextRetryDelay(retry, execution.attempt.number);
7981

82+
if (
83+
typeof __PROJECT_CONFIG__.retries?.enabledInDev === "boolean" &&
84+
!__PROJECT_CONFIG__.retries.enabledInDev
85+
) {
86+
// TODO: trigger a warning saying that retries are disabled in dev
87+
return "skipped";
88+
}
89+
8090
return typeof delay === "undefined" ? undefined : { timestamp: Date.now() + delay, delay };
8191
}
8292

@@ -154,10 +164,10 @@ class TaskExecutor {
154164
}
155165

156166
if (!middlewareFn) {
157-
return runFn({ payload, ctx });
167+
return runFn(payload, { ctx });
158168
}
159169

160-
return middlewareFn({ payload, ctx, next: async () => runFn({ payload, ctx, init }) });
170+
return middlewareFn(payload, { ctx, next: async () => runFn(payload, { ctx, init }) });
161171
}
162172

163173
async #callTaskInit(payload: unknown, ctx: TaskRunContext) {
@@ -168,7 +178,7 @@ class TaskExecutor {
168178
}
169179

170180
return tracer.startActiveSpan("init", async (span) => {
171-
return await initFn({ payload, ctx });
181+
return await initFn(payload, { ctx });
172182
});
173183
}
174184

@@ -180,7 +190,7 @@ class TaskExecutor {
180190
}
181191

182192
return tracer.startActiveSpan("cleanup", async (span) => {
183-
return await cleanupFn({ payload, ctx, init });
193+
return await cleanupFn(payload, { ctx, init });
184194
});
185195
}
186196
}
@@ -292,13 +302,16 @@ const handler = new ZodMessageHandler({
292302
},
293303
});
294304
} catch (e) {
305+
const retryResult = await executor.determineRetrying(execution, e);
306+
295307
return sender.send("TASK_RUN_COMPLETED", {
296308
execution,
297309
result: {
298310
id: execution.attempt.id,
299311
ok: false,
300312
error: parseError(e),
301-
retry: await executor.determineRetrying(execution, e),
313+
retry: typeof retryResult === "object" ? retryResult : undefined,
314+
skippedRetrying: retryResult === "skipped",
302315
},
303316
});
304317
} finally {

packages/cli-v3/src/workers/prod/backgroundWorker.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
BackgroundWorkerProperties,
3+
Config,
34
CreateBackgroundWorkerResponse,
45
ProdChildToWorkerMessages,
56
ProdTaskRunExecutionPayload,
@@ -37,7 +38,7 @@ class CleanupProcessError extends Error {
3738

3839
type BackgroundWorkerParams = {
3940
env: Record<string, string>;
40-
projectDir: string;
41+
projectConfig: Config;
4142
contentHash: string;
4243
debugOtel?: boolean;
4344
};
@@ -300,7 +301,7 @@ export class ProdBackgroundWorker {
300301
): Promise<TaskRunBuiltInError> {
301302
return {
302303
...error,
303-
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectDir),
304+
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectConfig.projectDir),
304305
};
305306
}
306307
}
@@ -342,7 +343,7 @@ class TaskRunProcess {
342343
env: {
343344
...this.env,
344345
OTEL_RESOURCE_ATTRIBUTES: JSON.stringify({
345-
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectDir,
346+
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectConfig.projectDir,
346347
}),
347348
...(this.worker.debugOtel ? { OTEL_LOG_LEVEL: "debug" } : {}),
348349
},

packages/cli-v3/src/workers/prod/entry-point.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
Config,
23
CoordinatorToProdWorkerMessages,
34
ProdWorkerToCoordinatorMessages,
45
TaskResource,
@@ -9,6 +10,8 @@ import { createServer } from "node:http";
910
import { ProdBackgroundWorker } from "./backgroundWorker";
1011
import { UncaughtExceptionError } from "../common/errors";
1112

13+
declare const __PROJECT_CONFIG__: Config;
14+
1215
const HTTP_SERVER_PORT = Number(process.env.HTTP_SERVER_PORT || getRandomPortNumber());
1316
const COORDINATOR_HOST = process.env.COORDINATOR_HOST || "127.0.0.1";
1417
const COORDINATOR_PORT = Number(process.env.COORDINATOR_PORT || 50080);
@@ -22,7 +25,6 @@ class ProdWorker {
2225
private apiUrl = process.env.TRIGGER_API_URL!;
2326
private apiKey = process.env.TRIGGER_SECRET_KEY!;
2427
private contentHash = process.env.TRIGGER_CONTENT_HASH!;
25-
private projectDir = process.env.TRIGGER_PROJECT_DIR!;
2628
private projectRef = process.env.TRIGGER_PROJECT_REF!;
2729
private envId = process.env.TRIGGER_ENV_ID!;
2830
private attemptId = process.env.TRIGGER_ATTEMPT_ID || "index-only";
@@ -49,7 +51,7 @@ class ProdWorker {
4951
this.#coordinatorSocket = this.#createCoordinatorSocket();
5052

5153
this.#backgroundWorker = new ProdBackgroundWorker("worker.js", {
52-
projectDir: this.projectDir,
54+
projectConfig: __PROJECT_CONFIG__,
5355
env: {
5456
TRIGGER_API_URL: this.apiUrl,
5557
TRIGGER_SECRET_KEY: this.apiKey,

packages/cli-v3/src/workers/prod/worker-facade.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ import {
33
ProdWorkerToChildMessages,
44
ZodIpcConnection,
55
type TracingSDK,
6+
Config,
67
} from "@trigger.dev/core/v3";
78
import "source-map-support/register.js";
89

910
__WORKER_SETUP__;
1011
declare const __WORKER_SETUP__: unknown;
12+
declare const __PROJECT_CONFIG__: Config;
1113
declare const tracingSDK: TracingSDK;
1214

1315
const otelTracer = tracingSDK.getTracer("trigger-prod-worker", packageJson.version);
@@ -64,12 +66,12 @@ class TaskExecutor {
6466
execution: TaskRunExecution,
6567
error: unknown
6668
): Promise<TaskRunExecutionRetry | undefined> {
67-
if (!this.task.retry) {
69+
const retry = this.task.retry ?? __PROJECT_CONFIG__.retries?.default;
70+
71+
if (!retry) {
6872
return;
6973
}
7074

71-
const retry = this.task.retry;
72-
7375
const delay = calculateNextRetryDelay(retry, execution.attempt.number);
7476

7577
return typeof delay === "undefined" ? undefined : { timestamp: Date.now() + delay, delay };
@@ -149,10 +151,10 @@ class TaskExecutor {
149151
}
150152

151153
if (!middlewareFn) {
152-
return runFn({ payload, ctx });
154+
return runFn(payload, { ctx });
153155
}
154156

155-
return middlewareFn({ payload, ctx, next: async () => runFn({ payload, ctx, init }) });
157+
return middlewareFn(payload, { ctx, next: async () => runFn(payload, { ctx, init }) });
156158
}
157159

158160
async #callTaskInit(payload: unknown, ctx: TaskRunContext) {
@@ -163,7 +165,7 @@ class TaskExecutor {
163165
}
164166

165167
return tracer.startActiveSpan("init", async (span) => {
166-
return await initFn({ payload, ctx });
168+
return await initFn(payload, { ctx });
167169
});
168170
}
169171

@@ -175,7 +177,7 @@ class TaskExecutor {
175177
}
176178

177179
return tracer.startActiveSpan("cleanup", async (span) => {
178-
return await cleanupFn({ payload, ctx, init });
180+
return await cleanupFn(payload, { ctx, init });
179181
});
180182
}
181183
}

packages/core/src/v3/schemas/common.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ export const TaskRunFailedExecutionResult = z.object({
163163
id: z.string(),
164164
error: TaskRunError,
165165
retry: TaskRunExecutionRetry.optional(),
166+
skippedRetrying: z.boolean().optional(),
166167
});
167168

168169
export type TaskRunFailedExecutionResult = z.infer<typeof TaskRunFailedExecutionResult>;

packages/core/src/v3/schemas/messages.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ export const TaskMetadata = z.object({
176176
exportName: z.string(),
177177
packageVersion: z.string(),
178178
queue: QueueOptions.optional(),
179-
retry: RetryOptions.required().optional(),
179+
retry: RetryOptions.optional(),
180180
});
181181

182182
export type TaskMetadata = z.infer<typeof TaskMetadata>;

packages/core/src/v3/schemas/schemas.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
BackgroundWorkerServerMessages,
77
ProdTaskRunExecution,
88
ProdTaskRunExecutionPayload,
9+
RetryOptions,
910
} from "./messages";
1011
import { TaskResource } from "./resources";
1112

@@ -14,6 +15,12 @@ export const Config = z.object({
1415
triggerDirectories: z.string().array().optional(),
1516
triggerUrl: z.string().optional(),
1617
projectDir: z.string().optional(),
18+
retries: z
19+
.object({
20+
enabledInDev: z.boolean().default(true),
21+
default: RetryOptions.optional(),
22+
})
23+
.optional(),
1724
});
1825

1926
export type Config = z.infer<typeof Config>;

packages/core/src/v3/utils/retries.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { type RetryOptions } from "../schemas";
22
import { calculateResetAt as calculateResetAtInternal } from "../../retry";
33

44
export const defaultRetryOptions = {
5-
maxAttempts: 10,
5+
maxAttempts: 3,
66
factor: 2,
77
minTimeoutInMs: 1000,
88
maxTimeoutInMs: 60000,
@@ -11,11 +11,13 @@ export const defaultRetryOptions = {
1111

1212
/**
1313
*
14-
* @param opts
14+
* @param options
1515
* @param attempt - The current attempt number. If the first attempt has failed, this will be 1.
1616
* @returns
1717
*/
18-
export function calculateNextRetryDelay(opts: Required<RetryOptions>, attempt: number) {
18+
export function calculateNextRetryDelay(options: RetryOptions, attempt: number) {
19+
const opts = { ...defaultRetryOptions, ...options };
20+
1921
if (attempt >= opts.maxAttempts) {
2022
return;
2123
}

0 commit comments

Comments
 (0)