Skip to content

Commit 71cf0ce

Browse files
committed
Add ttl option when triggering tasks, expire runs after ttl
Dev runs expire in 10m by default
1 parent f202aed commit 71cf0ce

File tree

20 files changed

+255
-22
lines changed

20 files changed

+255
-22
lines changed

apps/webapp/app/components/runs/v3/TaskRunStatus.tsx

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
NoSymbolIcon,
99
PauseCircleIcon,
1010
RectangleStackIcon,
11+
TrashIcon,
1112
XCircleIcon,
1213
} from "@heroicons/react/20/solid";
1314
import { TaskRunStatus } from "@trigger.dev/database";
@@ -30,6 +31,7 @@ export const allTaskRunStatuses = [
3031
"PAUSED",
3132
"INTERRUPTED",
3233
"SYSTEM_FAILURE",
34+
"EXPIRED",
3335
] as const satisfies Readonly<Array<TaskRunStatus>>;
3436

3537
export const filterableTaskRunStatuses = [
@@ -45,6 +47,7 @@ export const filterableTaskRunStatuses = [
4547
"CRASHED",
4648
"INTERRUPTED",
4749
"SYSTEM_FAILURE",
50+
"EXPIRED",
4851
] as const satisfies Readonly<Array<TaskRunStatus>>;
4952

5053
const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
@@ -61,6 +64,7 @@ const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
6164
SYSTEM_FAILURE: "Task has failed due to a system failure",
6265
PAUSED: "Task has been paused by the user",
6366
CRASHED: "Task has crashed and won't be retried",
67+
EXPIRED: "Task has surpassed its ttl and won't be executed",
6468
};
6569

6670
export const QUEUED_STATUSES: TaskRunStatus[] = ["PENDING", "WAITING_FOR_DEPLOY", "DELAYED"];
@@ -78,6 +82,7 @@ export const FINISHED_STATUSES: TaskRunStatus[] = [
7882
"INTERRUPTED",
7983
"SYSTEM_FAILURE",
8084
"CRASHED",
85+
"EXPIRED",
8186
];
8287

8388
export function descriptionForTaskRunStatus(status: TaskRunStatus): string {
@@ -139,6 +144,8 @@ export function TaskRunStatusIcon({
139144
return <BugAntIcon className={cn(runStatusClassNameColor(status), className)} />;
140145
case "CRASHED":
141146
return <FireIcon className={cn(runStatusClassNameColor(status), className)} />;
147+
case "EXPIRED":
148+
return <TrashIcon className={cn(runStatusClassNameColor(status), className)} />;
142149

143150
default: {
144151
assertNever(status);
@@ -161,6 +168,7 @@ export function runStatusClassNameColor(status: TaskRunStatus): string {
161168
case "PAUSED":
162169
return "text-amber-300";
163170
case "CANCELED":
171+
case "EXPIRED":
164172
return "text-charcoal-500";
165173
case "INTERRUPTED":
166174
return "text-error";
@@ -206,6 +214,8 @@ export function runStatusTitle(status: TaskRunStatus): string {
206214
return "System failure";
207215
case "CRASHED":
208216
return "Crashed";
217+
case "EXPIRED":
218+
return "Expired";
209219
default: {
210220
assertNever(status);
211221
}

apps/webapp/app/components/runs/v3/TaskRunsTable.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ export function TaskRunsTable({
119119
<TableHeaderCell>Test</TableHeaderCell>
120120
<TableHeaderCell>Created at</TableHeaderCell>
121121
<TableHeaderCell>Delayed until</TableHeaderCell>
122+
<TableHeaderCell>TTL</TableHeaderCell>
122123
<TableHeaderCell>
123124
<span className="sr-only">Go to page</span>
124125
</TableHeaderCell>
@@ -191,6 +192,7 @@ export function TaskRunsTable({
191192
<TableCell to={path}>
192193
{run.delayUntil ? <DateTime date={run.delayUntil} /> : "–"}
193194
</TableCell>
195+
<TableCell to={path}>{run.ttl ?? "–"}</TableCell>
194196
<RunActionsCell run={run} path={path} />
195197
</TableRow>
196198
);

apps/webapp/app/database-types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export const TaskRunStatus = {
4141
SYSTEM_FAILURE: "SYSTEM_FAILURE",
4242
CRASHED: "CRASHED",
4343
DELAYED: "DELAYED",
44+
EXPIRED: "EXPIRED",
4445
} as const satisfies Record<TaskRunStatusType, TaskRunStatusType>;
4546

4647
export const JobRunStatus = {

apps/webapp/app/models/taskRun.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ export function batchTaskRunItemStatusForRunStatus(
118118
case TaskRunStatus.COMPLETED_WITH_ERRORS:
119119
case TaskRunStatus.SYSTEM_FAILURE:
120120
case TaskRunStatus.CRASHED:
121+
case TaskRunStatus.EXPIRED:
121122
return BatchTaskRunItemStatus.FAILED;
122123
case TaskRunStatus.PENDING:
123124
case TaskRunStatus.WAITING_FOR_DEPLOY:

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
117117
output: $output,
118118
outputPresignedUrl: $outputPresignedUrl,
119119
isTest: taskRun.isTest,
120+
ttl: taskRun.ttl ?? undefined,
121+
expiredAt: taskRun.expiredAt ?? undefined,
120122
schedule: taskRun.schedule
121123
? {
122124
id: taskRun.schedule.friendlyId,
@@ -209,6 +211,9 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
209211
case "COMPLETED_WITH_ERRORS": {
210212
return "FAILED";
211213
}
214+
case "EXPIRED": {
215+
return "EXPIRED";
216+
}
212217
default: {
213218
assertNever(status);
214219
}

apps/webapp/app/presenters/v3/ApiRunListPresenter.server.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,8 @@ export class ApiRunListPresenter extends BasePresenter {
211211
finishedAt: run.finishedAt ? new Date(run.finishedAt) : undefined,
212212
delayedUntil: run.delayUntil ? new Date(run.delayUntil) : undefined,
213213
isTest: run.isTest,
214+
ttl: run.ttl ?? undefined,
215+
expiredAt: run.expiredAt ? new Date(run.expiredAt) : undefined,
214216
env: {
215217
id: run.environment.id,
216218
name: run.environment.slug,
@@ -269,6 +271,9 @@ export class ApiRunListPresenter extends BasePresenter {
269271
case "FAILED": {
270272
return "COMPLETED_WITH_ERRORS";
271273
}
274+
case "EXPIRED": {
275+
return "EXPIRED";
276+
}
272277
default: {
273278
assertNever(status);
274279
}

apps/webapp/app/presenters/v3/RunListPresenter.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ export class RunListPresenter extends BasePresenter {
163163
isTest: boolean;
164164
spanId: string;
165165
idempotencyKey: string | null;
166+
ttl: string | null;
167+
expiredAt: Date | null;
166168
}[]
167169
>`
168170
SELECT
@@ -180,7 +182,9 @@ export class RunListPresenter extends BasePresenter {
180182
tr."updatedAt" AS "updatedAt",
181183
tr."isTest" AS "isTest",
182184
tr."spanId" AS "spanId",
183-
tr."idempotencyKey" AS "idempotencyKey"
185+
tr."idempotencyKey" AS "idempotencyKey",
186+
tr."ttl" AS "ttl",
187+
tr."expiredAt" AS "expiredAt"
184188
FROM
185189
${sqlDatabaseSchema}."TaskRun" tr
186190
LEFT JOIN
@@ -297,6 +301,8 @@ export class RunListPresenter extends BasePresenter {
297301
isCancellable: isCancellableRunStatus(run.status),
298302
environment: displayableEnvironment(environment, userId),
299303
idempotencyKey: run.idempotencyKey ? run.idempotencyKey : undefined,
304+
ttl: run.ttl ? run.ttl : undefined,
305+
expiredAt: run.expiredAt ? run.expiredAt.toISOString() : undefined,
300306
};
301307
}),
302308
pagination: {

apps/webapp/app/services/worker.server.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
4848
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
4949
import { reportUsageEvent } from "~/v3/openMeter.server";
5050
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
51+
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";
5152

5253
const workerCatalog = {
5354
indexEndpoint: z.object({
@@ -181,6 +182,9 @@ const workerCatalog = {
181182
"v3.enqueueDelayedRun": z.object({
182183
runId: z.string(),
183184
}),
185+
"v3.expireRun": z.object({
186+
runId: z.string(),
187+
}),
184188
};
185189

186190
const executionWorkerCatalog = {
@@ -682,6 +686,15 @@ function getWorkerQueue() {
682686
handler: async (payload, job) => {
683687
const service = new EnqueueDelayedRunService();
684688

689+
return await service.call(payload.runId);
690+
},
691+
},
692+
"v3.expireRun": {
693+
priority: 0,
694+
maxAttempts: 8,
695+
handler: async (payload, job) => {
696+
const service = new ExpireEnqueuedRunService();
697+
685698
return await service.call(payload.runId);
686699
},
687700
},

apps/webapp/app/v3/requeueTaskRun.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ export class RequeueTaskRunService extends BaseService {
6969
case "CRASHED":
7070
case "COMPLETED_WITH_ERRORS":
7171
case "COMPLETED_SUCCESSFULLY":
72+
case "EXPIRED":
7273
case "CANCELED": {
7374
logger.debug("[RequeueTaskRunService] Task run is completed", { taskRun });
7475

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { logger } from "~/services/logger.server";
22
import { marqs } from "~/v3/marqs/index.server";
33
import { BaseService } from "./baseService.server";
4+
import { parseNaturalLanguageDuration } from "./triggerTask.server";
5+
import { workerQueue } from "~/services/worker.server";
6+
import { $transaction } from "~/db.server";
47

58
export class EnqueueDelayedRunService extends BaseService {
69
public async call(runId: string) {
@@ -34,14 +37,28 @@ export class EnqueueDelayedRunService extends BaseService {
3437
return;
3538
}
3639

37-
await this._prisma.taskRun.update({
38-
where: {
39-
id: run.id,
40-
},
41-
data: {
42-
status: "PENDING",
43-
queuedAt: new Date(),
44-
},
40+
await $transaction(this._prisma, async (tx) => {
41+
await tx.taskRun.update({
42+
where: {
43+
id: run.id,
44+
},
45+
data: {
46+
status: "PENDING",
47+
queuedAt: new Date(),
48+
},
49+
});
50+
51+
if (run.ttl) {
52+
const expireAt = parseNaturalLanguageDuration(run.ttl);
53+
54+
if (expireAt) {
55+
await workerQueue.enqueue(
56+
"v3.expireRun",
57+
{ runId: run.id },
58+
{ tx, runAt: expireAt, jobKey: `v3.expireRun.${run.id}` }
59+
);
60+
}
61+
}
4562
});
4663

4764
await marqs?.enqueueMessage(
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { logger } from "~/services/logger.server";
2+
import { marqs } from "~/v3/marqs/index.server";
3+
import { BaseService } from "./baseService.server";
4+
5+
export class ExpireEnqueuedRunService extends BaseService {
6+
public async call(runId: string) {
7+
const run = await this._prisma.taskRun.findUnique({
8+
where: {
9+
id: runId,
10+
},
11+
include: {
12+
runtimeEnvironment: {
13+
include: {
14+
organization: true,
15+
project: true,
16+
},
17+
},
18+
},
19+
});
20+
21+
if (!run) {
22+
logger.debug("Could not find enqueued run to expire", {
23+
runId,
24+
});
25+
26+
return;
27+
}
28+
29+
if (run.status !== "PENDING") {
30+
logger.debug("Run cannot be expired because it's not in PENDING status", {
31+
run,
32+
});
33+
34+
return;
35+
}
36+
37+
logger.debug("Expiring enqueued run", {
38+
run,
39+
});
40+
41+
await this._prisma.taskRun.update({
42+
where: {
43+
id: run.id,
44+
},
45+
data: {
46+
status: "EXPIRED",
47+
expiredAt: new Date(),
48+
},
49+
});
50+
51+
await marqs?.acknowledgeMessage(run.id);
52+
}
53+
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { generateFriendlyId } from "../friendlyIdentifiers";
1414
import { uploadToObjectStore } from "../r2.server";
1515
import { startActiveSpan } from "../tracer.server";
1616
import { BaseService } from "./baseService.server";
17-
import { $transaction } from "~/db.server";
1817

1918
export type TriggerTaskServiceOptions = {
2019
idempotencyKey?: string;
@@ -39,6 +38,11 @@ export class TriggerTaskService extends BaseService {
3938
const idempotencyKey = options.idempotencyKey ?? body.options?.idempotencyKey;
4039
const delayUntil = await parseDelay(body.options?.delay);
4140

41+
const ttl =
42+
typeof body.options?.ttl === "number"
43+
? stringifyDuration(body.options?.ttl)
44+
: body.options?.ttl ?? (environment.type === "DEVELOPMENT" ? "10m" : undefined);
45+
4246
const existingRun = idempotencyKey
4347
? await this._prisma.taskRun.findUnique({
4448
where: {
@@ -135,6 +139,7 @@ export class TriggerTaskService extends BaseService {
135139
isTest: body.options?.test ?? false,
136140
delayUntil,
137141
queuedAt: delayUntil ? undefined : new Date(),
142+
ttl,
138143
},
139144
});
140145

@@ -229,6 +234,18 @@ export class TriggerTaskService extends BaseService {
229234
);
230235
}
231236

237+
if (!taskRun.delayUntil && taskRun.ttl) {
238+
const expireAt = parseNaturalLanguageDuration(taskRun.ttl);
239+
240+
if (expireAt) {
241+
await workerQueue.enqueue(
242+
"v3.expireRun",
243+
{ runId: taskRun.id },
244+
{ tx, runAt: expireAt, jobKey: `v3.expireRun.${taskRun.id}` }
245+
);
246+
}
247+
}
248+
232249
return taskRun;
233250
},
234251
async (_, tx) => {
@@ -341,7 +358,7 @@ export async function parseDelay(value?: string | Date): Promise<Date | undefine
341358
}
342359
}
343360

344-
function parseNaturalLanguageDuration(duration: string): Date | undefined {
361+
export function parseNaturalLanguageDuration(duration: string): Date | undefined {
345362
const regexPattern = /^(\d+w)?(\d+d)?(\d+h)?(\d+m)?(\d+s)?$/;
346363

347364
const result: Date = new Date();
@@ -392,3 +409,25 @@ function parseNaturalLanguageDuration(duration: string): Date | undefined {
392409

393410
return undefined;
394411
}
412+
413+
function stringifyDuration(seconds: number): string | undefined {
414+
if (seconds <= 0) {
415+
return;
416+
}
417+
418+
const units = {
419+
w: Math.floor(seconds / 604800),
420+
d: Math.floor((seconds % 604800) / 86400),
421+
h: Math.floor((seconds % 86400) / 3600),
422+
m: Math.floor((seconds % 3600) / 60),
423+
s: Math.floor(seconds % 60),
424+
};
425+
426+
// Filter the units having non-zero values and join them
427+
const result: string = Object.entries(units)
428+
.filter(([unit, val]) => val != 0)
429+
.map(([unit, val]) => `${val}${unit}`)
430+
.join("");
431+
432+
return result;
433+
}

apps/webapp/app/v3/taskStatus.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ export const FINAL_RUN_STATUSES: TaskRunStatus[] = [
3939
"COMPLETED_WITH_ERRORS",
4040
"INTERRUPTED",
4141
"SYSTEM_FAILURE",
42+
"EXPIRED",
4243
];
4344
export const FINAL_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["CANCELED", "COMPLETED", "FAILED"];
4445

docs/images/v3/expired-runs.png

222 KB
Loading

0 commit comments

Comments
 (0)