Skip to content

FinalizeRunService #1250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions apps/webapp/app/components/runs/v3/TaskRunStatus.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -67,23 +67,29 @@ const taskRunStatusDescriptions: Record<TaskRunStatus, string> = {
EXPIRED: "Task has surpassed its ttl and won't be executed",
};

export const QUEUED_STATUSES: TaskRunStatus[] = ["PENDING", "WAITING_FOR_DEPLOY", "DELAYED"];
export const QUEUED_STATUSES = [
"PENDING",
"WAITING_FOR_DEPLOY",
"DELAYED",
] satisfies TaskRunStatus[];

export const RUNNING_STATUSES: TaskRunStatus[] = [
export const RUNNING_STATUSES = [
"EXECUTING",
"RETRYING_AFTER_FAILURE",
"WAITING_TO_RESUME",
];
] satisfies TaskRunStatus[];

export const FINISHED_STATUSES: TaskRunStatus[] = [
export const FINISHED_STATUSES = [
"COMPLETED_SUCCESSFULLY",
"CANCELED",
"COMPLETED_WITH_ERRORS",
"INTERRUPTED",
"SYSTEM_FAILURE",
"CRASHED",
"EXPIRED",
];
] satisfies TaskRunStatus[];

export type FINISHED_STATUSES = (typeof FINISHED_STATUSES)[number];

export function descriptionForTaskRunStatus(status: TaskRunStatus): string {
return taskRunStatusDescriptions[status];
Expand Down
22 changes: 8 additions & 14 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";

import { TaskRunStatus } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
import { BaseService } from "./services/baseService.server";
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";

const FAILABLE_TASK_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "PENDING", "WAITING_FOR_DEPLOY"];

Expand Down Expand Up @@ -40,7 +39,12 @@ export class FailedTaskRunService extends BaseService {
// No more retries, we need to fail the task run
logger.debug("[FailedTaskRunService] Failing task run", { taskRun, completion });

await marqs?.acknowledgeMessage(taskRun.id);
const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRun.id,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
});

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(taskRun.spanId, {
Expand All @@ -58,15 +62,5 @@ export class FailedTaskRunService extends BaseService {
},
],
});

await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
status: "SYSTEM_FAILURE",
completedAt: new Date(),
},
});
}
}
48 changes: 22 additions & 26 deletions apps/webapp/app/v3/services/cancelAttempt.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { $transaction, type PrismaClientOrTransaction, prisma } from "~/db.server";
import { type AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { eventRepository } from "../eventRepository.server";
import { BaseService } from "./baseService.server";

import { PrismaClientOrTransaction, prisma } from "~/db.server";
import { isCancellableRunStatus } from "../taskStatus";
import { BaseService } from "./baseService.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";

export class CancelAttemptService extends BaseService {
Expand Down Expand Up @@ -51,28 +50,25 @@ export class CancelAttemptService extends BaseService {
return;
}

await marqs?.acknowledgeMessage(taskRunId);

await this._prisma.taskRunAttempt.update({
where: {
friendlyId: attemptId,
},
data: {
status: "CANCELED",
completedAt: cancelledAt,
taskRun: {
update: {
data: {
status: isCancellableRunStatus(taskRunAttempt.taskRun.status)
? "INTERRUPTED"
: undefined,
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
? cancelledAt
: undefined,
},
},
await $transaction(this._prisma, async (tx) => {
await tx.taskRunAttempt.update({
where: {
friendlyId: attemptId,
},
},
data: {
status: "CANCELED",
completedAt: cancelledAt,
},
});

const finalizeService = new FinalizeTaskRunService(tx);
await finalizeService.call({
id: taskRunId,
status: isCancellableRunStatus(taskRunAttempt.taskRun.status) ? "INTERRUPTED" : undefined,
completedAt: isCancellableRunStatus(taskRunAttempt.taskRun.status)
? cancelledAt
: undefined,
});
});

const inProgressEvents = await eventRepository.queryIncompleteEvents({
Expand Down
23 changes: 8 additions & 15 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { Prisma, TaskRun } from "@trigger.dev/database";
import { type Prisma, type TaskRun } from "@trigger.dev/database";
import assertNever from "assert-never";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
import { eventRepository } from "../eventRepository.server";
import { socketIo } from "../handleSocketIo.server";
import { devPubSub } from "../marqs/devPubSub.server";
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
import { BaseService } from "./baseService.server";
import { CancelAttemptService } from "./cancelAttempt.server";
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";

type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
include: {
Expand Down Expand Up @@ -47,18 +47,11 @@ export class CancelTaskRunService extends BaseService {
return;
}

// Remove the task run from the queue if it's there for some reason
await marqs?.acknowledgeMessage(taskRun.id);

// Set the task run status to cancelled
const cancelledTaskRun = await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
status: "CANCELED",
completedAt: opts.cancelledAt,
},
const finalizeService = new FinalizeTaskRunService();
const cancelledTaskRun = await finalizeService.call({
id: taskRun.id,
status: "CANCELED",
completedAt: opts.cancelledAt,
include: {
attempts: {
where: {
Expand Down
114 changes: 54 additions & 60 deletions apps/webapp/app/v3/services/completeAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
flattenAttributes,
sanitizeError,
} from "@trigger.dev/core/v3";
import { PrismaClientOrTransaction } from "~/db.server";
import { $transaction, PrismaClientOrTransaction } from "~/db.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { safeJsonParse } from "~/utils/json";
Expand All @@ -23,6 +23,7 @@ import { TaskRun } from "@trigger.dev/database";
import { PerformTaskAttemptAlertsService } from "./alerts/performTaskAttemptAlerts.server";
import { RetryAttemptService } from "./retryAttempt.server";
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";

type FoundAttempt = Awaited<ReturnType<typeof findAttempt>>;

Expand Down Expand Up @@ -50,17 +51,30 @@ export class CompleteAttemptService extends BaseService {
id: execution.attempt.id,
});

// Update the task run to be failed
await this._prisma.taskRun.update({
const run = await this._prisma.taskRun.findFirst({
where: {
friendlyId: execution.run.id,
},
data: {
status: "SYSTEM_FAILURE",
completedAt: new Date(),
select: {
id: true,
},
});

if (!run) {
logger.error("[CompleteAttemptService] Task run not found", {
friendlyId: execution.run.id,
});

return "COMPLETED";
}

const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: run.id,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
});

// No attempt, so there's no message to ACK
return "COMPLETED";
}
Expand Down Expand Up @@ -96,28 +110,25 @@ export class CompleteAttemptService extends BaseService {
taskRunAttempt: NonNullable<FoundAttempt>,
env?: AuthenticatedEnvironment
): Promise<"COMPLETED"> {
await this._prisma.taskRunAttempt.update({
where: { id: taskRunAttempt.id },
data: {
status: "COMPLETED",
completedAt: new Date(),
output: completion.output,
outputType: completion.outputType,
usageDurationMs: completion.usage?.durationMs,
taskRun: {
update: {
data: {
status: "COMPLETED_SUCCESSFULLY",
completedAt: new Date(),
},
},
await $transaction(this._prisma, async (tx) => {
await tx.taskRunAttempt.update({
where: { id: taskRunAttempt.id },
data: {
status: "COMPLETED",
completedAt: new Date(),
output: completion.output,
outputType: completion.outputType,
usageDurationMs: completion.usage?.durationMs,
},
},
});

logger.debug("Completed attempt successfully, ACKing message");
});

await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId);
const finalizeService = new FinalizeTaskRunService(tx);
await finalizeService.call({
id: taskRunAttempt.taskRunId,
status: "COMPLETED_SUCCESSFULLY",
completedAt: new Date(),
});
});

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, {
Expand Down Expand Up @@ -255,19 +266,13 @@ export class CompleteAttemptService extends BaseService {
if (!checkpointCreateResult) {
logger.error("Failed to create checkpoint", { checkpoint, execution: execution.run.id });

// Update the task run to be failed
await this._prisma.taskRun.update({
where: {
friendlyId: execution.run.id,
},
data: {
status: "SYSTEM_FAILURE",
completedAt: new Date(),
},
const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRunAttempt.taskRunId,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
});

await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId);

return "COMPLETED";
}

Expand All @@ -279,11 +284,6 @@ export class CompleteAttemptService extends BaseService {

return "RETRIED";
} else {
// No more retries, we need to fail the task run
logger.debug("Completed attempt, ACKing message", taskRunAttempt);

await marqs?.acknowledgeMessage(taskRunAttempt.taskRunId);

// Now we need to "complete" the task run event/span
await eventRepository.completeEvent(taskRunAttempt.taskRun.spanId, {
endTime: new Date(),
Expand All @@ -305,6 +305,13 @@ export class CompleteAttemptService extends BaseService {
sanitizedError.type === "INTERNAL_ERROR" &&
sanitizedError.code === "GRACEFUL_EXIT_TIMEOUT"
) {
const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRunAttempt.taskRunId,
status: "SYSTEM_FAILURE",
completedAt: new Date(),
});

// We need to fail all incomplete spans
const inProgressEvents = await eventRepository.queryIncompleteEvents({
attemptId: execution.attempt.id,
Expand All @@ -328,25 +335,12 @@ export class CompleteAttemptService extends BaseService {
});
})
);

await this._prisma.taskRun.update({
where: {
id: taskRunAttempt.taskRunId,
},
data: {
status: "SYSTEM_FAILURE",
completedAt: new Date(),
},
});
} else {
await this._prisma.taskRun.update({
where: {
id: taskRunAttempt.taskRunId,
},
data: {
status: "COMPLETED_WITH_ERRORS",
completedAt: new Date(),
},
const finalizeService = new FinalizeTaskRunService();
await finalizeService.call({
id: taskRunAttempt.taskRunId,
status: "COMPLETED_WITH_ERRORS",
completedAt: new Date(),
});
}

Expand Down
18 changes: 6 additions & 12 deletions apps/webapp/app/v3/services/crashTaskRun.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
import { sanitizeError } from "@trigger.dev/core/v3";
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";

export type CrashTaskRunServiceOptions = {
reason?: string;
Expand Down Expand Up @@ -43,18 +44,11 @@ export class CrashTaskRunService extends BaseService {
return;
}

// Remove the task run from the queue if it's there for some reason
await marqs?.acknowledgeMessage(taskRun.id);

// Set the task run status to crashed
const crashedTaskRun = await this._prisma.taskRun.update({
where: {
id: taskRun.id,
},
data: {
status: "CRASHED",
completedAt: new Date(),
},
const finalizeService = new FinalizeTaskRunService();
const crashedTaskRun = await finalizeService.call({
id: taskRun.id,
status: "CRASHED",
completedAt: new Date(),
include: {
attempts: {
where: {
Expand Down
Loading
Loading