Skip to content

v3: task payload refactor and configurable default retries #939

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ async function compileProject(config: ResolvedConfig, options: DeployCommandOpti
outdir: "out",
define: {
TRIGGER_API_URL: `"${config.triggerUrl}"`,
__PROJECT_CONFIG__: JSON.stringify(config),
},
});

Expand Down Expand Up @@ -646,6 +647,9 @@ async function compileProject(config: ResolvedConfig, options: DeployCommandOpti
format: "cjs", // This is needed to support opentelemetry instrumentation that uses module patching
target: ["node18", "es2020"],
outdir: "out",
define: {
__PROJECT_CONFIG__: JSON.stringify(config),
},
});

if (entryPointResult.errors.length > 0) {
Expand Down
3 changes: 2 additions & 1 deletion packages/cli-v3/src/commands/dev.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ function useDev({
outdir: "out",
define: {
TRIGGER_API_URL: `"${config.triggerUrl}"`,
__PROJECT_CONFIG__: JSON.stringify(config),
},
plugins: [
{
Expand Down Expand Up @@ -409,7 +410,7 @@ function useDev({
await environmentClient.getEnvironmentVariables(config.project);

const backgroundWorker = new BackgroundWorker(fullPath, {
projectDir: config.projectDir,
projectConfig: config,
dependencies,
env: {
TRIGGER_API_URL: apiUrl,
Expand Down
8 changes: 4 additions & 4 deletions packages/cli-v3/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ import { TaskMetadataWithFilePath } from "@trigger.dev/core/v3";

export type TaskMetadataWithFunctions = TaskMetadataWithFilePath & {
fns: {
run: (params: any) => Promise<any>;
init?: (params: any) => Promise<void>;
cleanup?: (params: any) => Promise<void>;
middleware?: (params: any) => Promise<void>;
run: (payload: any, params: any) => Promise<any>;
init?: (payload: any, params: any) => Promise<void>;
cleanup?: (payload: any, params: any) => Promise<void>;
middleware?: (payload: any, params: any) => Promise<void>;
};
};

Expand Down
22 changes: 17 additions & 5 deletions packages/cli-v3/src/workers/dev/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
BackgroundWorkerProperties,
BackgroundWorkerServerMessages,
CreateBackgroundWorkerResponse,
ResolvedConfig,
SemanticInternalAttributes,
TaskMetadataWithFilePath,
TaskRunBuiltInError,
Expand Down Expand Up @@ -148,14 +149,25 @@ export class BackgroundWorkerCoordinator {

const elapsed = performance.now() - now;

const retryingText =
!completion.ok && completion.skippedRetrying
? " (retrying skipped)"
: !completion.ok && completion.retry !== undefined
? ` (retrying in ${completion.retry.delay}ms)`
: "";

const resultText = !completion.ok
? completion.error.type === "INTERNAL_ERROR" &&
completion.error.code === TaskRunErrorCodes.TASK_EXECUTION_ABORTED
? chalk.yellow("cancelled")
: chalk.red("error")
: chalk.red(`error${retryingText}`)
: chalk.green("success");

const errorText = !completion.ok ? this.#formatErrorLog(completion.error) : "";
const errorText = !completion.ok
? this.#formatErrorLog(completion.error)
: "retry" in completion
? `retry in ${completion.retry}ms`
: "";

const elapsedText = chalk.dim(`(${elapsed.toFixed(2)}ms)`);

Expand Down Expand Up @@ -205,7 +217,7 @@ class CleanupProcessError extends Error {
export type BackgroundWorkerParams = {
env: Record<string, string>;
dependencies?: Record<string, string>;
projectDir: string;
projectConfig: ResolvedConfig;
debuggerOn: boolean;
debugOtel?: boolean;
};
Expand Down Expand Up @@ -434,7 +446,7 @@ export class BackgroundWorker {
): Promise<TaskRunBuiltInError> {
return {
...error,
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectDir),
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectConfig.projectDir),
};
}
}
Expand Down Expand Up @@ -478,7 +490,7 @@ class TaskRunProcess {
env: {
...this.env,
OTEL_RESOURCE_ATTRIBUTES: JSON.stringify({
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectDir,
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectConfig.projectDir,
}),
...(this.worker.debugOtel ? { OTEL_LOG_LEVEL: "debug" } : {}),
},
Expand Down
33 changes: 23 additions & 10 deletions packages/cli-v3/src/workers/dev/worker-facade.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { type TracingSDK } from "@trigger.dev/core/v3";
import { Config, type TracingSDK } from "@trigger.dev/core/v3";
import "source-map-support/register.js";

__WORKER_SETUP__;
declare const __WORKER_SETUP__: unknown;

declare const __PROJECT_CONFIG__: Config;
declare const tracingSDK: TracingSDK;

const otelTracer = tracingSDK.getTracer("trigger-dev-worker", packageJson.version);
Expand Down Expand Up @@ -68,15 +70,23 @@ class TaskExecutor {
async determineRetrying(
execution: TaskRunExecution,
error: unknown
): Promise<TaskRunExecutionRetry | undefined> {
if (!this.task.retry) {
): Promise<TaskRunExecutionRetry | "skipped" | undefined> {
const retry = this.task.retry ?? __PROJECT_CONFIG__.retries?.default;

if (!retry) {
return;
}

const retry = this.task.retry;

const delay = calculateNextRetryDelay(retry, execution.attempt.number);

if (
typeof __PROJECT_CONFIG__.retries?.enabledInDev === "boolean" &&
!__PROJECT_CONFIG__.retries.enabledInDev
) {
// TODO: trigger a warning saying that retries are disabled in dev
return "skipped";
}

return typeof delay === "undefined" ? undefined : { timestamp: Date.now() + delay, delay };
}

Expand Down Expand Up @@ -154,10 +164,10 @@ class TaskExecutor {
}

if (!middlewareFn) {
return runFn({ payload, ctx });
return runFn(payload, { ctx });
}

return middlewareFn({ payload, ctx, next: async () => runFn({ payload, ctx, init }) });
return middlewareFn(payload, { ctx, next: async () => runFn(payload, { ctx, init }) });
}

async #callTaskInit(payload: unknown, ctx: TaskRunContext) {
Expand All @@ -168,7 +178,7 @@ class TaskExecutor {
}

return tracer.startActiveSpan("init", async (span) => {
return await initFn({ payload, ctx });
return await initFn(payload, { ctx });
});
}

Expand All @@ -180,7 +190,7 @@ class TaskExecutor {
}

return tracer.startActiveSpan("cleanup", async (span) => {
return await cleanupFn({ payload, ctx, init });
return await cleanupFn(payload, { ctx, init });
});
}
}
Expand Down Expand Up @@ -292,13 +302,16 @@ const handler = new ZodMessageHandler({
},
});
} catch (e) {
const retryResult = await executor.determineRetrying(execution, e);

return sender.send("TASK_RUN_COMPLETED", {
execution,
result: {
id: execution.attempt.id,
ok: false,
error: parseError(e),
retry: await executor.determineRetrying(execution, e),
retry: typeof retryResult === "object" ? retryResult : undefined,
skippedRetrying: retryResult === "skipped",
},
});
} finally {
Expand Down
7 changes: 4 additions & 3 deletions packages/cli-v3/src/workers/prod/backgroundWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
BackgroundWorkerProperties,
Config,
CreateBackgroundWorkerResponse,
ProdChildToWorkerMessages,
ProdTaskRunExecutionPayload,
Expand Down Expand Up @@ -37,7 +38,7 @@ class CleanupProcessError extends Error {

type BackgroundWorkerParams = {
env: Record<string, string>;
projectDir: string;
projectConfig: Config;
contentHash: string;
debugOtel?: boolean;
};
Expand Down Expand Up @@ -300,7 +301,7 @@ export class ProdBackgroundWorker {
): Promise<TaskRunBuiltInError> {
return {
...error,
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectDir),
stackTrace: correctErrorStackTrace(error.stackTrace, this.params.projectConfig.projectDir),
};
}
}
Expand Down Expand Up @@ -342,7 +343,7 @@ class TaskRunProcess {
env: {
...this.env,
OTEL_RESOURCE_ATTRIBUTES: JSON.stringify({
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectDir,
[SemanticInternalAttributes.PROJECT_DIR]: this.worker.projectConfig.projectDir,
}),
...(this.worker.debugOtel ? { OTEL_LOG_LEVEL: "debug" } : {}),
},
Expand Down
6 changes: 4 additions & 2 deletions packages/cli-v3/src/workers/prod/entry-point.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import {
Config,
CoordinatorToProdWorkerMessages,
ProdWorkerToCoordinatorMessages,
TaskResource,
Expand All @@ -9,6 +10,8 @@ import { createServer } from "node:http";
import { ProdBackgroundWorker } from "./backgroundWorker";
import { UncaughtExceptionError } from "../common/errors";

declare const __PROJECT_CONFIG__: Config;

const HTTP_SERVER_PORT = Number(process.env.HTTP_SERVER_PORT || getRandomPortNumber());
const COORDINATOR_HOST = process.env.COORDINATOR_HOST || "127.0.0.1";
const COORDINATOR_PORT = Number(process.env.COORDINATOR_PORT || 50080);
Expand All @@ -22,7 +25,6 @@ class ProdWorker {
private apiUrl = process.env.TRIGGER_API_URL!;
private apiKey = process.env.TRIGGER_SECRET_KEY!;
private contentHash = process.env.TRIGGER_CONTENT_HASH!;
private projectDir = process.env.TRIGGER_PROJECT_DIR!;
private projectRef = process.env.TRIGGER_PROJECT_REF!;
private envId = process.env.TRIGGER_ENV_ID!;
private attemptId = process.env.TRIGGER_ATTEMPT_ID || "index-only";
Expand All @@ -49,7 +51,7 @@ class ProdWorker {
this.#coordinatorSocket = this.#createCoordinatorSocket();

this.#backgroundWorker = new ProdBackgroundWorker("worker.js", {
projectDir: this.projectDir,
projectConfig: __PROJECT_CONFIG__,
env: {
TRIGGER_API_URL: this.apiUrl,
TRIGGER_SECRET_KEY: this.apiKey,
Expand Down
16 changes: 9 additions & 7 deletions packages/cli-v3/src/workers/prod/worker-facade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ import {
ProdWorkerToChildMessages,
ZodIpcConnection,
type TracingSDK,
Config,
} from "@trigger.dev/core/v3";
import "source-map-support/register.js";

__WORKER_SETUP__;
declare const __WORKER_SETUP__: unknown;
declare const __PROJECT_CONFIG__: Config;
declare const tracingSDK: TracingSDK;

const otelTracer = tracingSDK.getTracer("trigger-prod-worker", packageJson.version);
Expand Down Expand Up @@ -64,12 +66,12 @@ class TaskExecutor {
execution: TaskRunExecution,
error: unknown
): Promise<TaskRunExecutionRetry | undefined> {
if (!this.task.retry) {
const retry = this.task.retry ?? __PROJECT_CONFIG__.retries?.default;

if (!retry) {
return;
}

const retry = this.task.retry;

const delay = calculateNextRetryDelay(retry, execution.attempt.number);

return typeof delay === "undefined" ? undefined : { timestamp: Date.now() + delay, delay };
Expand Down Expand Up @@ -149,10 +151,10 @@ class TaskExecutor {
}

if (!middlewareFn) {
return runFn({ payload, ctx });
return runFn(payload, { ctx });
}

return middlewareFn({ payload, ctx, next: async () => runFn({ payload, ctx, init }) });
return middlewareFn(payload, { ctx, next: async () => runFn(payload, { ctx, init }) });
}

async #callTaskInit(payload: unknown, ctx: TaskRunContext) {
Expand All @@ -163,7 +165,7 @@ class TaskExecutor {
}

return tracer.startActiveSpan("init", async (span) => {
return await initFn({ payload, ctx });
return await initFn(payload, { ctx });
});
}

Expand All @@ -175,7 +177,7 @@ class TaskExecutor {
}

return tracer.startActiveSpan("cleanup", async (span) => {
return await cleanupFn({ payload, ctx, init });
return await cleanupFn(payload, { ctx, init });
});
}
}
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/v3/schemas/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ export const TaskRunFailedExecutionResult = z.object({
id: z.string(),
error: TaskRunError,
retry: TaskRunExecutionRetry.optional(),
skippedRetrying: z.boolean().optional(),
});

export type TaskRunFailedExecutionResult = z.infer<typeof TaskRunFailedExecutionResult>;
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ export const TaskMetadata = z.object({
exportName: z.string(),
packageVersion: z.string(),
queue: QueueOptions.optional(),
retry: RetryOptions.required().optional(),
retry: RetryOptions.optional(),
});

export type TaskMetadata = z.infer<typeof TaskMetadata>;
Expand Down
7 changes: 7 additions & 0 deletions packages/core/src/v3/schemas/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
BackgroundWorkerServerMessages,
ProdTaskRunExecution,
ProdTaskRunExecutionPayload,
RetryOptions,
} from "./messages";
import { TaskResource } from "./resources";

Expand All @@ -14,6 +15,12 @@ export const Config = z.object({
triggerDirectories: z.string().array().optional(),
triggerUrl: z.string().optional(),
projectDir: z.string().optional(),
retries: z
.object({
enabledInDev: z.boolean().default(true),
default: RetryOptions.optional(),
})
.optional(),
});

export type Config = z.infer<typeof Config>;
Expand Down
8 changes: 5 additions & 3 deletions packages/core/src/v3/utils/retries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type RetryOptions } from "../schemas";
import { calculateResetAt as calculateResetAtInternal } from "../../retry";

export const defaultRetryOptions = {
maxAttempts: 10,
maxAttempts: 3,
factor: 2,
minTimeoutInMs: 1000,
maxTimeoutInMs: 60000,
Expand All @@ -11,11 +11,13 @@ export const defaultRetryOptions = {

/**
*
* @param opts
* @param options
* @param attempt - The current attempt number. If the first attempt has failed, this will be 1.
* @returns
*/
export function calculateNextRetryDelay(opts: Required<RetryOptions>, attempt: number) {
export function calculateNextRetryDelay(options: RetryOptions, attempt: number) {
const opts = { ...defaultRetryOptions, ...options };

if (attempt >= opts.maxAttempts) {
return;
}
Expand Down
Loading