Skip to content

v3: fix dependency checkpoint race #1171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/silly-buses-obey.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add callback to checkpoint created message
57 changes: 36 additions & 21 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1162,13 +1162,7 @@ class TaskCoordinator {
return;
}

if (!checkpoint.docker || !willSimulate) {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}

this.#platformSocket?.send("CHECKPOINT_CREATED", {
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
docker: checkpoint.docker,
Expand All @@ -1179,6 +1173,17 @@ class TaskCoordinator {
now: message.now,
},
});

if (ack?.keepRunAlive) {
logger.log("keeping run alive after duration checkpoint", { runId: socket.data.runId });
return;
}

if (!checkpoint.docker || !willSimulate) {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}
});

socket.on("WAIT_FOR_TASK", async (message, callback) => {
Expand All @@ -1205,13 +1210,7 @@ class TaskCoordinator {
return;
}

if (!checkpoint.docker || !willSimulate) {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}

this.#platformSocket?.send("CHECKPOINT_CREATED", {
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
docker: checkpoint.docker,
Expand All @@ -1221,6 +1220,17 @@ class TaskCoordinator {
friendlyId: message.friendlyId,
},
});

if (ack?.keepRunAlive) {
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
return;
}

if (!checkpoint.docker || !willSimulate) {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}
});

socket.on("WAIT_FOR_BATCH", async (message, callback) => {
Expand All @@ -1247,13 +1257,7 @@ class TaskCoordinator {
return;
}

if (!checkpoint.docker || !willSimulate) {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}

this.#platformSocket?.send("CHECKPOINT_CREATED", {
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
attemptFriendlyId: message.attemptFriendlyId,
docker: checkpoint.docker,
Expand All @@ -1264,6 +1268,17 @@ class TaskCoordinator {
runFriendlyIds: message.runFriendlyIds,
},
});

if (ack?.keepRunAlive) {
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
return;
}

if (!checkpoint.docker || !willSimulate) {
socket.emit("REQUEST_EXIT", {
version: "v1",
});
}
});

socket.on("INDEX_TASKS", async (message, callback) => {
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/presenters/v3/RunListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { Direction } from "~/components/runs/RunStatuses";
import { FINISHED_STATUSES } from "~/components/runs/v3/TaskRunStatus";
import { sqlDatabaseSchema } from "~/db.server";
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
import { CANCELLABLE_STATUSES } from "~/v3/services/cancelTaskRun.server";
import { BasePresenter } from "./basePresenter.server";
import { isCancellableRunStatus } from "~/v3/taskStatus";

export type RunListOptions = {
userId?: string;
Expand Down Expand Up @@ -291,7 +291,7 @@ export class RunListPresenter extends BasePresenter {
taskIdentifier: run.taskIdentifier,
spanId: run.spanId,
isReplayable: true,
isCancellable: CANCELLABLE_STATUSES.includes(run.status),
isCancellable: isCancellableRunStatus(run.status),
environment: displayableEnvironment(environment, userId),
idempotencyKey: run.idempotencyKey ? run.idempotencyKey : undefined,
};
Expand Down
15 changes: 13 additions & 2 deletions apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,19 @@ function createCoordinatorNamespace(io: Server) {
await sharedQueueTasks.taskRunHeartbeat(message.runId);
},
CHECKPOINT_CREATED: async (message) => {
const createCheckpoint = new CreateCheckpointService();
await createCheckpoint.call(message);
try {
const createCheckpoint = new CreateCheckpointService();
const result = await createCheckpoint.call(message);

return { keepRunAlive: result?.keepRunAlive ?? false };
} catch (error) {
logger.error("Error while creating checkpoint", {
rawMessage: message,
error: error instanceof Error ? error.message : error,
});

return { keepRunAlive: false };
}
},
CREATE_WORKER: async (message) => {
try {
Expand Down
16 changes: 2 additions & 14 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import {
BackgroundWorkerTask,
RuntimeEnvironment,
TaskRun,
TaskRunAttemptStatus,
TaskRunStatus,
} from "@trigger.dev/database";
import { z } from "zod";
Expand All @@ -43,6 +42,7 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
import { EnvironmentVariable } from "../environmentVariables/repository";
import { machinePresetFromConfig } from "../machinePresets.server";
import { env } from "~/env.server";
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";

const WithTraceContext = z.object({
traceparent: z.string().optional(),
Expand Down Expand Up @@ -962,19 +962,7 @@ class SharedQueueTasks {
}

if (setToExecuting) {
const FINAL_RUN_STATUSES: TaskRunStatus[] = [
"CANCELED",
"COMPLETED_SUCCESSFULLY",
"COMPLETED_WITH_ERRORS",
"INTERRUPTED",
"SYSTEM_FAILURE",
];
const FINAL_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["CANCELED", "COMPLETED", "FAILED"];

if (
FINAL_ATTEMPT_STATUSES.includes(attempt.status) ||
FINAL_RUN_STATUSES.includes(attempt.taskRun.status)
) {
if (isFinalAttemptStatus(attempt.status) || isFinalRunStatus(attempt.taskRun.status)) {
logger.error("Status already in final state", {
attempt: {
id: attempt.id,
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/cancelAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { logger } from "~/services/logger.server";

import { PrismaClientOrTransaction, prisma } from "~/db.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
import { CANCELLABLE_STATUSES } from "./cancelTaskRun.server";
import { isCancellableRunStatus } from "../taskStatus";

export class CancelAttemptService extends BaseService {
public async call(
Expand Down Expand Up @@ -55,7 +55,7 @@ export class CancelAttemptService extends BaseService {
taskRun: {
update: {
data: {
status: CANCELLABLE_STATUSES.includes(taskRunAttempt.taskRun.status)
status: isCancellableRunStatus(taskRunAttempt.taskRun.status)
? "INTERRUPTED"
: undefined,
},
Expand Down
25 changes: 7 additions & 18 deletions apps/webapp/app/v3/services/cancelTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Prisma, TaskRun, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
import { Prisma, TaskRun } from "@trigger.dev/database";
import assertNever from "assert-never";
import { logger } from "~/services/logger.server";
import { marqs } from "~/v3/marqs/index.server";
Expand All @@ -7,22 +7,7 @@ import { socketIo } from "../handleSocketIo.server";
import { devPubSub } from "../marqs/devPubSub.server";
import { BaseService } from "./baseService.server";
import { CancelAttemptService } from "./cancelAttempt.server";

export const CANCELLABLE_STATUSES: Array<TaskRunStatus> = [
"PENDING",
"WAITING_FOR_DEPLOY",
"EXECUTING",
"PAUSED",
"WAITING_TO_RESUME",
"PAUSED",
"RETRYING_AFTER_FAILURE",
];

const CANCELLABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = [
"EXECUTING",
"PAUSED",
"PENDING",
];
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";

type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
include: {
Expand Down Expand Up @@ -53,7 +38,11 @@ export class CancelTaskRunService extends BaseService {
};

// Make sure the task run is in a cancellable state
if (!CANCELLABLE_STATUSES.includes(taskRun.status)) {
if (!isCancellableRunStatus(taskRun.status)) {
logger.error("Task run is not in a cancellable state", {
runId: taskRun.id,
status: taskRun.status,
});
return;
}

Expand Down
23 changes: 4 additions & 19 deletions apps/webapp/app/v3/services/crashTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,11 @@
import {
TaskRun,
TaskRunAttempt,
TaskRunAttemptStatus,
TaskRunStatus,
} from "@trigger.dev/database";
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
import { eventRepository } from "../eventRepository.server";
import { marqs } from "~/v3/marqs/index.server";
import { BaseService } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";

export const CRASHABLE_RUN_STATUSES: Array<TaskRunStatus> = [
"PENDING",
"WAITING_FOR_DEPLOY",
"EXECUTING",
"PAUSED",
"WAITING_TO_RESUME",
"PAUSED",
"RETRYING_AFTER_FAILURE",
];

const CRASHABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = ["EXECUTING", "PAUSED", "PENDING"];
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";

export type CrashTaskRunServiceOptions = {
reason?: string;
Expand Down Expand Up @@ -52,7 +36,8 @@ export class CrashTaskRunService extends BaseService {
}

// Make sure the task run is in a crashable state
if (!CRASHABLE_RUN_STATUSES.includes(taskRun.status)) {
if (!isCrashableRunStatus(taskRun.status)) {
logger.error("Task run is not in a crashable state", { runId, status: taskRun.status });
return;
}

Expand Down
Loading
Loading