Skip to content

Commit bb38261

Browse files
authored
FinalizeRunService (#1250)
* WIP notes on each location where we’ll use finalize * Initial FinalizeTaskRunService * ExpireEnqueuedRunService uses FinalizeTaskRunService * FailedTaskRunService uses FinalizeTaskRunService * Allow passing in an include when finalizing the run * CrashTaskRunService using FinalizeTaskRunService * Remove comments * Status is optional * CancelAttemptService using FinalizeTaskRunService * Import tidy * CancelTaskRunService using FinalizeTaskRunService * Import tidying * CompleteAttemptService system failure switched to FinalizeTaskRunService * Added more logging to Finalizing * CompleteAttemptStatus COMPLETED_SUCCESSFULLY * CompletedAttempt “SYSTEM_FAILURE” * CompletedService final pair * Use satisfies so we can derive types from the groups * Only allow final states to be used with this service * BaseService tx support, minor improvements
1 parent 0d4e3e7 commit bb38261

File tree

8 files changed

+175
-144
lines changed

8 files changed

+175
-144
lines changed

apps/webapp/app/components/runs/v3/TaskRunStatus.tsx

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,23 +67,29 @@ const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
6767
EXPIRED: "Task has surpassed its ttl and won't be executed",
6868
};
6969

70-
export const QUEUED_STATUSES: TaskRunStatus[] = ["PENDING", "WAITING_FOR_DEPLOY", "DELAYED"];
70+
export const QUEUED_STATUSES = [
71+
"PENDING",
72+
"WAITING_FOR_DEPLOY",
73+
"DELAYED",
74+
] satisfies TaskRunStatus[];
7175

72-
export const RUNNING_STATUSES: TaskRunStatus[] = [
76+
export const RUNNING_STATUSES = [
7377
"EXECUTING",
7478
"RETRYING_AFTER_FAILURE",
7579
"WAITING_TO_RESUME",
76-
];
80+
] satisfies TaskRunStatus[];
7781

78-
export const FINISHED_STATUSES: TaskRunStatus[] = [
82+
export const FINISHED_STATUSES = [
7983
"COMPLETED_SUCCESSFULLY",
8084
"CANCELED",
8185
"COMPLETED_WITH_ERRORS",
8286
"INTERRUPTED",
8387
"SYSTEM_FAILURE",
8488
"CRASHED",
8589
"EXPIRED",
86-
];
90+
] satisfies TaskRunStatus[];
91+
92+
export type FINISHED_STATUSES = (typeof FINISHED_STATUSES)[number];
8793

8894
export function descriptionForTaskRunStatus(status: TaskRunStatus): string {
8995
return taskRunStatusDescriptions[status];

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
2-
import { logger } from "~/services/logger.server";
3-
import { marqs } from "~/v3/marqs/index.server";
4-
52
import { TaskRunStatus } from "@trigger.dev/database";
3+
import { logger } from "~/services/logger.server";
64
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
75
import { BaseService } from "./services/baseService.server";
6+
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
87

98
const FAILABLE_TASK_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "PENDING", "WAITING_FOR_DEPLOY"];
109

@@ -40,7 +39,12 @@ export class FailedTaskRunService extends BaseService {
4039
// No more retries, we need to fail the task run
4140
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });
4241

43-
await marqs?.acknowledgeMessage(taskRun.id);
42+
const finalizeService = new FinalizeTaskRunService();
43+
await finalizeService.call({
44+
id: taskRun.id,
45+
status: "SYSTEM_FAILURE",
46+
completedAt: new Date(),
47+
});
4448

4549
// Now we need to "complete" the task run event/span
4650
await eventRepository.completeEvent(taskRun.spanId, {
@@ -58,15 +62,5 @@ export class FailedTaskRunService extends BaseService {
5862
},
5963
],
6064
});
61-
62-
await this._prisma.taskRun.update({
63-
where: {
64-
id: taskRun.id,
65-
},
66-
data: {
67-
status: "SYSTEM_FAILURE",
68-
completedAt: new Date(),
69-
},
70-
});
7165
}
7266
}

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 22 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1+
import { $transaction, type PrismaClientOrTransaction, prisma } from "~/db.server";
2+
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
23
import { logger } from "~/services/logger.server";
3-
import { marqs } from "~/v3/marqs/index.server";
44
import { eventRepository } from "../eventRepository.server";
5-
import { BaseService } from "./baseService.server";
6-
7-
import { PrismaClientOrTransaction, prisma } from "~/db.server";
85
import { isCancellableRunStatus } from "../taskStatus";
6+
import { BaseService } from "./baseService.server";
7+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
98
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
109

1110
export class CancelAttemptService extends BaseService {
@@ -51,28 +50,25 @@ export class CancelAttemptService extends BaseService {
5150
return;
5251
}
5352

54-
await marqs?.acknowledgeMessage(taskRunId);
55-
56-
await this._prisma.taskRunAttempt.update({
57-
where: {
58-
friendlyId: attemptId,
59-
},
60-
data: {
61-
status: "CANCELED",
62-
completedAt: cancelledAt,
63-
taskRun: {
64-
update: {
65-
data: {
66-
status: isCancellableRunStatus(taskRunAttempt.taskRun.status)
67-
? "INTERRUPTED"
68-
: undefined,
69-
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
70-
? cancelledAt
71-
: undefined,
72-
},
73-
},
53+
await $transaction(this._prisma, async (tx) => {
54+
await tx.taskRunAttempt.update({
55+
where: {
56+
friendlyId: attemptId,
7457
},
75-
},
58+
data: {
59+
status: "CANCELED",
60+
completedAt: cancelledAt,
61+
},
62+
});
63+
64+
const finalizeService = new FinalizeTaskRunService(tx);
65+
await finalizeService.call({
66+
id: taskRunId,
67+
status: isCancellableRunStatus(taskRunAttempt.taskRun.status) ? "INTERRUPTED" : undefined,
68+
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
69+
? cancelledAt
70+
: undefined,
71+
});
7672
});
7773

7874
const inProgressEvents = await eventRepository.queryIncompleteEvents({

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
1-
import { Prisma, TaskRun } from "@trigger.dev/database";
1+
import { type Prisma, type TaskRun } from "@trigger.dev/database";
22
import assertNever from "assert-never";
33
import { logger } from "~/services/logger.server";
4-
import { marqs } from "~/v3/marqs/index.server";
54
import { eventRepository } from "../eventRepository.server";
65
import { socketIo } from "../handleSocketIo.server";
76
import { devPubSub } from "../marqs/devPubSub.server";
7+
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
88
import { BaseService } from "./baseService.server";
99
import { CancelAttemptService } from "./cancelAttempt.server";
10-
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
1110
import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server";
11+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
1212

1313
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
1414
include: {
@@ -47,18 +47,11 @@ export class CancelTaskRunService extends BaseService {
4747
return;
4848
}
4949

50-
// Remove the task run from the queue if it's there for some reason
51-
await marqs?.acknowledgeMessage(taskRun.id);
52-
53-
// Set the task run status to cancelled
54-
const cancelledTaskRun = await this._prisma.taskRun.update({
55-
where: {
56-
id: taskRun.id,
57-
},
58-
data: {
59-
status: "CANCELED",
60-
completedAt: opts.cancelledAt,
61-
},
50+
const finalizeService = new FinalizeTaskRunService();
51+
const cancelledTaskRun = await finalizeService.call({
52+
id: taskRun.id,
53+
status: "CANCELED",
54+
completedAt: opts.cancelledAt,
6255
include: {
6356
attempts: {
6457
where: {

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 54 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import {
88
flattenAttributes,
99
sanitizeError,
1010
} from "@trigger.dev/core/v3";
11-
import { PrismaClientOrTransaction } from "~/db.server";
11+
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
1212
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1313
import { logger } from "~/services/logger.server";
1414
import { safeJsonParse } from "~/utils/json";
@@ -23,6 +23,7 @@ import { TaskRun } from "@trigger.dev/database";
2323
import { PerformTaskAttemptAlertsService } from "./alerts/performTaskAttemptAlerts.server";
2424
import { RetryAttemptService } from "./retryAttempt.server";
2525
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
26+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
2627

2728
type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;
2829

@@ -50,17 +51,30 @@ export class CompleteAttemptService extends BaseService {
5051
id: execution.attempt.id,
5152
});
5253

53-
// Update the task run to be failed
54-
await this._prisma.taskRun.update({
54+
const run = await this._prisma.taskRun.findFirst({
5555
where: {
5656
friendlyId: execution.run.id,
5757
},
58-
data: {
59-
status: "SYSTEM_FAILURE",
60-
completedAt: new Date(),
58+
select: {
59+
id: true,
6160
},
6261
});
6362

63+
if (!run) {
64+
logger.error("[CompleteAttemptService] Task run not found", {
65+
friendlyId: execution.run.id,
66+
});
67+
68+
return "COMPLETED";
69+
}
70+
71+
const finalizeService = new FinalizeTaskRunService();
72+
await finalizeService.call({
73+
id: run.id,
74+
status: "SYSTEM_FAILURE",
75+
completedAt: new Date(),
76+
});
77+
6478
// No attempt, so there's no message to ACK
6579
return "COMPLETED";
6680
}
@@ -96,28 +110,25 @@ export class CompleteAttemptService extends BaseService {
96110
taskRunAttempt: NonNullable<FoundAttempt>,
97111
env?: AuthenticatedEnvironment
98112
): Promise<"COMPLETED"> {
99-
await this._prisma.taskRunAttempt.update({
100-
where: { id: taskRunAttempt.id },
101-
data: {
102-
status: "COMPLETED",
103-
completedAt: new Date(),
104-
output: completion.output,
105-
outputType: completion.outputType,
106-
usageDurationMs: completion.usage?.durationMs,
107-
taskRun: {
108-
update: {
109-
data: {
110-
status: "COMPLETED_SUCCESSFULLY",
111-
completedAt: new Date(),
112-
},
113-
},
113+
await $transaction(this._prisma, async (tx) => {
114+
await tx.taskRunAttempt.update({
115+
where: { id: taskRunAttempt.id },
116+
data: {
117+
status: "COMPLETED",
118+
completedAt: new Date(),
119+
output: completion.output,
120+
outputType: completion.outputType,
121+
usageDurationMs: completion.usage?.durationMs,
114122
},
115-
},
116-
});
117-
118-
logger.debug("Completed attempt successfully, ACKing message");
123+
});
119124

120-
await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId);
125+
const finalizeService = new FinalizeTaskRunService(tx);
126+
await finalizeService.call({
127+
id: taskRunAttempt.taskRunId,
128+
status: "COMPLETED_SUCCESSFULLY",
129+
completedAt: new Date(),
130+
});
131+
});
121132

122133
// Now we need to "complete" the task run event/span
123134
await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, {
@@ -255,19 +266,13 @@ export class CompleteAttemptService extends BaseService {
255266
if (!checkpointCreateResult) {
256267
logger.error("Failed to create checkpoint", { checkpoint, execution: execution.run.id });
257268

258-
// Update the task run to be failed
259-
await this._prisma.taskRun.update({
260-
where: {
261-
friendlyId: execution.run.id,
262-
},
263-
data: {
264-
status: "SYSTEM_FAILURE",
265-
completedAt: new Date(),
266-
},
269+
const finalizeService = new FinalizeTaskRunService();
270+
await finalizeService.call({
271+
id: taskRunAttempt.taskRunId,
272+
status: "SYSTEM_FAILURE",
273+
completedAt: new Date(),
267274
});
268275

269-
await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId);
270-
271276
return "COMPLETED";
272277
}
273278

@@ -279,11 +284,6 @@ export class CompleteAttemptService extends BaseService {
279284

280285
return "RETRIED";
281286
} else {
282-
// No more retries, we need to fail the task run
283-
logger.debug("Completed attempt, ACKing message", taskRunAttempt);
284-
285-
await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId);
286-
287287
// Now we need to "complete" the task run event/span
288288
await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, {
289289
endTime: new Date(),
@@ -305,6 +305,13 @@ export class CompleteAttemptService extends BaseService {
305305
sanitizedError.type === "INTERNAL_ERROR" &&
306306
sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT"
307307
) {
308+
const finalizeService = new FinalizeTaskRunService();
309+
await finalizeService.call({
310+
id: taskRunAttempt.taskRunId,
311+
status: "SYSTEM_FAILURE",
312+
completedAt: new Date(),
313+
});
314+
308315
// We need to fail all incomplete spans
309316
const inProgressEvents = await eventRepository.queryIncompleteEvents({
310317
attemptId: execution.attempt.id,
@@ -328,25 +335,12 @@ export class CompleteAttemptService extends BaseService {
328335
});
329336
})
330337
);
331-
332-
await this._prisma.taskRun.update({
333-
where: {
334-
id: taskRunAttempt.taskRunId,
335-
},
336-
data: {
337-
status: "SYSTEM_FAILURE",
338-
completedAt: new Date(),
339-
},
340-
});
341338
} else {
342-
await this._prisma.taskRun.update({
343-
where: {
344-
id: taskRunAttempt.taskRunId,
345-
},
346-
data: {
347-
status: "COMPLETED_WITH_ERRORS",
348-
completedAt: new Date(),
349-
},
339+
const finalizeService = new FinalizeTaskRunService();
340+
await finalizeService.call({
341+
id: taskRunAttempt.taskRunId,
342+
status: "COMPLETED_WITH_ERRORS",
343+
completedAt: new Date(),
350344
});
351345
}
352346

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
77
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
88
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
99
import { sanitizeError } from "@trigger.dev/core/v3";
10+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
1011

1112
export type CrashTaskRunServiceOptions = {
1213
reason?: string;
@@ -43,18 +44,11 @@ export class CrashTaskRunService extends BaseService {
4344
return;
4445
}
4546

46-
// Remove the task run from the queue if it's there for some reason
47-
await marqs?.acknowledgeMessage(taskRun.id);
48-
49-
// Set the task run status to crashed
50-
const crashedTaskRun = await this._prisma.taskRun.update({
51-
where: {
52-
id: taskRun.id,
53-
},
54-
data: {
55-
status: "CRASHED",
56-
completedAt: new Date(),
57-
},
47+
const finalizeService = new FinalizeTaskRunService();
48+
const crashedTaskRun = await finalizeService.call({
49+
id: taskRun.id,
50+
status: "CRASHED",
51+
completedAt: new Date(),
5852
include: {
5953
attempts: {
6054
where: {

0 commit comments

Comments
 (0)