Skip to content

Commit 55d1f8c

Browse files
authored
v3: fix dependency checkpoint race (#1171)
* consolidate task statuses and utils * check dependency completion when creating checkpoints * add changeset
1 parent 9835f4e commit 55d1f8c

File tree

12 files changed

+205
-100
lines changed

12 files changed

+205
-100
lines changed

.changeset/silly-buses-obey.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Add callback to checkpoint created message

apps/coordinator/src/index.ts

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,13 +1162,7 @@ class TaskCoordinator {
11621162
return;
11631163
}
11641164

1165-
if (!checkpoint.docker || !willSimulate) {
1166-
socket.emit("REQUEST_EXIT", {
1167-
version: "v1",
1168-
});
1169-
}
1170-
1171-
this.#platformSocket?.send("CHECKPOINT_CREATED", {
1165+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
11721166
version: "v1",
11731167
attemptFriendlyId: message.attemptFriendlyId,
11741168
docker: checkpoint.docker,
@@ -1179,6 +1173,17 @@ class TaskCoordinator {
11791173
now: message.now,
11801174
},
11811175
});
1176+
1177+
if (ack?.keepRunAlive) {
1178+
logger.log("keeping run alive after duration checkpoint", { runId: socket.data.runId });
1179+
return;
1180+
}
1181+
1182+
if (!checkpoint.docker || !willSimulate) {
1183+
socket.emit("REQUEST_EXIT", {
1184+
version: "v1",
1185+
});
1186+
}
11821187
});
11831188

11841189
socket.on("WAIT_FOR_TASK", async (message, callback) => {
@@ -1205,13 +1210,7 @@ class TaskCoordinator {
12051210
return;
12061211
}
12071212

1208-
if (!checkpoint.docker || !willSimulate) {
1209-
socket.emit("REQUEST_EXIT", {
1210-
version: "v1",
1211-
});
1212-
}
1213-
1214-
this.#platformSocket?.send("CHECKPOINT_CREATED", {
1213+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
12151214
version: "v1",
12161215
attemptFriendlyId: message.attemptFriendlyId,
12171216
docker: checkpoint.docker,
@@ -1221,6 +1220,17 @@ class TaskCoordinator {
12211220
friendlyId: message.friendlyId,
12221221
},
12231222
});
1223+
1224+
if (ack?.keepRunAlive) {
1225+
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
1226+
return;
1227+
}
1228+
1229+
if (!checkpoint.docker || !willSimulate) {
1230+
socket.emit("REQUEST_EXIT", {
1231+
version: "v1",
1232+
});
1233+
}
12241234
});
12251235

12261236
socket.on("WAIT_FOR_BATCH", async (message, callback) => {
@@ -1247,13 +1257,7 @@ class TaskCoordinator {
12471257
return;
12481258
}
12491259

1250-
if (!checkpoint.docker || !willSimulate) {
1251-
socket.emit("REQUEST_EXIT", {
1252-
version: "v1",
1253-
});
1254-
}
1255-
1256-
this.#platformSocket?.send("CHECKPOINT_CREATED", {
1260+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
12571261
version: "v1",
12581262
attemptFriendlyId: message.attemptFriendlyId,
12591263
docker: checkpoint.docker,
@@ -1264,6 +1268,17 @@ class TaskCoordinator {
12641268
runFriendlyIds: message.runFriendlyIds,
12651269
},
12661270
});
1271+
1272+
if (ack?.keepRunAlive) {
1273+
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
1274+
return;
1275+
}
1276+
1277+
if (!checkpoint.docker || !willSimulate) {
1278+
socket.emit("REQUEST_EXIT", {
1279+
version: "v1",
1280+
});
1281+
}
12671282
});
12681283

12691284
socket.on("INDEX_TASKS", async (message, callback) => {

apps/webapp/app/presenters/v3/RunListPresenter.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import { Direction } from "~/components/runs/RunStatuses";
44
import { FINISHED_STATUSES } from "~/components/runs/v3/TaskRunStatus";
55
import { sqlDatabaseSchema } from "~/db.server";
66
import { displayableEnvironment } from "~/models/runtimeEnvironment.server";
7-
import { CANCELLABLE_STATUSES } from "~/v3/services/cancelTaskRun.server";
87
import { BasePresenter } from "./basePresenter.server";
8+
import { isCancellableRunStatus } from "~/v3/taskStatus";
99

1010
export type RunListOptions = {
1111
userId?: string;
@@ -291,7 +291,7 @@ export class RunListPresenter extends BasePresenter {
291291
taskIdentifier: run.taskIdentifier,
292292
spanId: run.spanId,
293293
isReplayable: true,
294-
isCancellable: CANCELLABLE_STATUSES.includes(run.status),
294+
isCancellable: isCancellableRunStatus(run.status),
295295
environment: displayableEnvironment(environment, userId),
296296
idempotencyKey: run.idempotencyKey ? run.idempotencyKey : undefined,
297297
};

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,19 @@ function createCoordinatorNamespace(io: Server) {
138138
await sharedQueueTasks.taskRunHeartbeat(message.runId);
139139
},
140140
CHECKPOINT_CREATED: async (message) => {
141-
const createCheckpoint = new CreateCheckpointService();
142-
await createCheckpoint.call(message);
141+
try {
142+
const createCheckpoint = new CreateCheckpointService();
143+
const result = await createCheckpoint.call(message);
144+
145+
return { keepRunAlive: result?.keepRunAlive ?? false };
146+
} catch (error) {
147+
logger.error("Error while creating checkpoint", {
148+
rawMessage: message,
149+
error: error instanceof Error ? error.message : error,
150+
});
151+
152+
return { keepRunAlive: false };
153+
}
143154
},
144155
CREATE_WORKER: async (message) => {
145156
try {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import {
1717
BackgroundWorkerTask,
1818
RuntimeEnvironment,
1919
TaskRun,
20-
TaskRunAttemptStatus,
2120
TaskRunStatus,
2221
} from "@trigger.dev/database";
2322
import { z } from "zod";
@@ -43,6 +42,7 @@ import { generateJWTTokenForEnvironment } from "~/services/apiAuth.server";
4342
import { EnvironmentVariable } from "../environmentVariables/repository";
4443
import { machinePresetFromConfig } from "../machinePresets.server";
4544
import { env } from "~/env.server";
45+
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
4646

4747
const WithTraceContext = z.object({
4848
traceparent: z.string().optional(),
@@ -962,19 +962,7 @@ class SharedQueueTasks {
962962
}
963963

964964
if (setToExecuting) {
965-
const FINAL_RUN_STATUSES: TaskRunStatus[] = [
966-
"CANCELED",
967-
"COMPLETED_SUCCESSFULLY",
968-
"COMPLETED_WITH_ERRORS",
969-
"INTERRUPTED",
970-
"SYSTEM_FAILURE",
971-
];
972-
const FINAL_ATTEMPT_STATUSES: TaskRunAttemptStatus[] = ["CANCELED", "COMPLETED", "FAILED"];
973-
974-
if (
975-
FINAL_ATTEMPT_STATUSES.includes(attempt.status) ||
976-
FINAL_RUN_STATUSES.includes(attempt.taskRun.status)
977-
) {
965+
if (isFinalAttemptStatus(attempt.status) || isFinalRunStatus(attempt.taskRun.status)) {
978966
logger.error("Status already in final state", {
979967
attempt: {
980968
id: attempt.id,

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { logger } from "~/services/logger.server";
66

77
import { PrismaClientOrTransaction, prisma } from "~/db.server";
88
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
9-
import { CANCELLABLE_STATUSES } from "./cancelTaskRun.server";
9+
import { isCancellableRunStatus } from "../taskStatus";
1010

1111
export class CancelAttemptService extends BaseService {
1212
public async call(
@@ -55,7 +55,7 @@ export class CancelAttemptService extends BaseService {
5555
taskRun: {
5656
update: {
5757
data: {
58-
status: CANCELLABLE_STATUSES.includes(taskRunAttempt.taskRun.status)
58+
status: isCancellableRunStatus(taskRunAttempt.taskRun.status)
5959
? "INTERRUPTED"
6060
: undefined,
6161
},

apps/webapp/app/v3/services/cancelTaskRun.server.ts

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Prisma, TaskRun, TaskRunAttemptStatus, TaskRunStatus } from "@trigger.dev/database";
1+
import { Prisma, TaskRun } from "@trigger.dev/database";
22
import assertNever from "assert-never";
33
import { logger } from "~/services/logger.server";
44
import { marqs } from "~/v3/marqs/index.server";
@@ -7,22 +7,7 @@ import { socketIo } from "../handleSocketIo.server";
77
import { devPubSub } from "../marqs/devPubSub.server";
88
import { BaseService } from "./baseService.server";
99
import { CancelAttemptService } from "./cancelAttempt.server";
10-
11-
export const CANCELLABLE_STATUSES: Array<TaskRunStatus> = [
12-
"PENDING",
13-
"WAITING_FOR_DEPLOY",
14-
"EXECUTING",
15-
"PAUSED",
16-
"WAITING_TO_RESUME",
17-
"PAUSED",
18-
"RETRYING_AFTER_FAILURE",
19-
];
20-
21-
const CANCELLABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = [
22-
"EXECUTING",
23-
"PAUSED",
24-
"PENDING",
25-
];
10+
import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskStatus";
2611

2712
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
2813
include: {
@@ -53,7 +38,11 @@ export class CancelTaskRunService extends BaseService {
5338
};
5439

5540
// Make sure the task run is in a cancellable state
56-
if (!CANCELLABLE_STATUSES.includes(taskRun.status)) {
41+
if (!isCancellableRunStatus(taskRun.status)) {
42+
logger.error("Task run is not in a cancellable state", {
43+
runId: taskRun.id,
44+
status: taskRun.status,
45+
});
5746
return;
5847
}
5948

apps/webapp/app/v3/services/crashTaskRun.server.ts

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,11 @@
1-
import {
2-
TaskRun,
3-
TaskRunAttempt,
4-
TaskRunAttemptStatus,
5-
TaskRunStatus,
6-
} from "@trigger.dev/database";
1+
import { TaskRun, TaskRunAttempt } from "@trigger.dev/database";
72
import { eventRepository } from "../eventRepository.server";
83
import { marqs } from "~/v3/marqs/index.server";
94
import { BaseService } from "./baseService.server";
105
import { logger } from "~/services/logger.server";
116
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
127
import { ResumeTaskRunDependenciesService } from "./resumeTaskRunDependencies.server";
13-
14-
export const CRASHABLE_RUN_STATUSES: Array<TaskRunStatus> = [
15-
"PENDING",
16-
"WAITING_FOR_DEPLOY",
17-
"EXECUTING",
18-
"PAUSED",
19-
"WAITING_TO_RESUME",
20-
"PAUSED",
21-
"RETRYING_AFTER_FAILURE",
22-
];
23-
24-
const CRASHABLE_ATTEMPT_STATUSES: Array<TaskRunAttemptStatus> = ["EXECUTING", "PAUSED", "PENDING"];
8+
import { CRASHABLE_ATTEMPT_STATUSES, isCrashableRunStatus } from "../taskStatus";
259

2610
export type CrashTaskRunServiceOptions = {
2711
reason?: string;
@@ -52,7 +36,8 @@ export class CrashTaskRunService extends BaseService {
5236
}
5337

5438
// Make sure the task run is in a crashable state
55-
if (!CRASHABLE_RUN_STATUSES.includes(taskRun.status)) {
39+
if (!isCrashableRunStatus(taskRun.status)) {
40+
logger.error("Task run is not in a crashable state", { runId, status: taskRun.status });
5641
return;
5742
}
5843

0 commit comments

Comments
 (0)