Skip to content

Commit 265c8c2

Browse files
committed
check dependency completion when creating checkpoints
1 parent 0f7a889 commit 265c8c2

File tree

4 files changed

+107
-27
lines changed

4 files changed

+107
-27
lines changed

apps/coordinator/src/index.ts

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1162,13 +1162,7 @@ class TaskCoordinator {
11621162
return;
11631163
}
11641164

1165-
if (!checkpoint.docker || !willSimulate) {
1166-
socket.emit("REQUEST_EXIT", {
1167-
version: "v1",
1168-
});
1169-
}
1170-
1171-
this.#platformSocket?.send("CHECKPOINT_CREATED", {
1165+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
11721166
version: "v1",
11731167
attemptFriendlyId: message.attemptFriendlyId,
11741168
docker: checkpoint.docker,
@@ -1179,6 +1173,17 @@ class TaskCoordinator {
11791173
now: message.now,
11801174
},
11811175
});
1176+
1177+
if (ack?.keepRunAlive) {
1178+
logger.log("keeping run alive after duration checkpoint", { runId: socket.data.runId });
1179+
return;
1180+
}
1181+
1182+
if (!checkpoint.docker || !willSimulate) {
1183+
socket.emit("REQUEST_EXIT", {
1184+
version: "v1",
1185+
});
1186+
}
11821187
});
11831188

11841189
socket.on("WAIT_FOR_TASK", async (message, callback) => {
@@ -1205,13 +1210,7 @@ class TaskCoordinator {
12051210
return;
12061211
}
12071212

1208-
if (!checkpoint.docker || !willSimulate) {
1209-
socket.emit("REQUEST_EXIT", {
1210-
version: "v1",
1211-
});
1212-
}
1213-
1214-
this.#platformSocket?.send("CHECKPOINT_CREATED", {
1213+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
12151214
version: "v1",
12161215
attemptFriendlyId: message.attemptFriendlyId,
12171216
docker: checkpoint.docker,
@@ -1221,6 +1220,17 @@ class TaskCoordinator {
12211220
friendlyId: message.friendlyId,
12221221
},
12231222
});
1223+
1224+
if (ack?.keepRunAlive) {
1225+
logger.log("keeping run alive after task checkpoint", { runId: socket.data.runId });
1226+
return;
1227+
}
1228+
1229+
if (!checkpoint.docker || !willSimulate) {
1230+
socket.emit("REQUEST_EXIT", {
1231+
version: "v1",
1232+
});
1233+
}
12241234
});
12251235

12261236
socket.on("WAIT_FOR_BATCH", async (message, callback) => {
@@ -1247,13 +1257,7 @@ class TaskCoordinator {
12471257
return;
12481258
}
12491259

1250-
if (!checkpoint.docker || !willSimulate) {
1251-
socket.emit("REQUEST_EXIT", {
1252-
version: "v1",
1253-
});
1254-
}
1255-
1256-
this.#platformSocket?.send("CHECKPOINT_CREATED", {
1260+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
12571261
version: "v1",
12581262
attemptFriendlyId: message.attemptFriendlyId,
12591263
docker: checkpoint.docker,
@@ -1264,6 +1268,17 @@ class TaskCoordinator {
12641268
runFriendlyIds: message.runFriendlyIds,
12651269
},
12661270
});
1271+
1272+
if (ack?.keepRunAlive) {
1273+
logger.log("keeping run alive after batch checkpoint", { runId: socket.data.runId });
1274+
return;
1275+
}
1276+
1277+
if (!checkpoint.docker || !willSimulate) {
1278+
socket.emit("REQUEST_EXIT", {
1279+
version: "v1",
1280+
});
1281+
}
12671282
});
12681283

12691284
socket.on("INDEX_TASKS", async (message, callback) => {

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,19 @@ function createCoordinatorNamespace(io: Server) {
138138
await sharedQueueTasks.taskRunHeartbeat(message.runId);
139139
},
140140
CHECKPOINT_CREATED: async (message) => {
141-
const createCheckpoint = new CreateCheckpointService();
142-
await createCheckpoint.call(message);
141+
try {
142+
const createCheckpoint = new CreateCheckpointService();
143+
const result = await createCheckpoint.call(message);
144+
145+
return { keepRunAlive: result?.keepRunAlive ?? false };
146+
} catch (error) {
147+
logger.error("Error while creating checkpoint", {
148+
rawMessage: message,
149+
error: error instanceof Error ? error.message : error,
150+
});
151+
152+
return { keepRunAlive: false };
153+
}
143154
},
144155
CREATE_WORKER: async (message) => {
145156
try {

apps/webapp/app/v3/services/createCheckpoint.server.ts

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { CoordinatorToPlatformMessages } from "@trigger.dev/core/v3";
22
import type { InferSocketMessageSchema } from "@trigger.dev/core/v3/zodSocket";
3-
import type { CheckpointRestoreEvent } from "@trigger.dev/database";
3+
import type { Checkpoint, CheckpointRestoreEvent } from "@trigger.dev/database";
44
import { logger } from "~/services/logger.server";
55
import { generateFriendlyId } from "../friendlyIdentifiers";
66
import { marqs } from "~/v3/marqs/index.server";
@@ -15,7 +15,14 @@ export class CreateCheckpointService extends BaseService {
1515
InferSocketMessageSchema<typeof CoordinatorToPlatformMessages, "CHECKPOINT_CREATED">,
1616
"version"
1717
>
18-
) {
18+
): Promise<
19+
| {
20+
checkpoint: Checkpoint;
21+
event: CheckpointRestoreEvent;
22+
keepRunAlive: boolean;
23+
}
24+
| undefined
25+
> {
1926
logger.debug(`Creating checkpoint`, params);
2027

2128
const attempt = await this._prisma.taskRunAttempt.findUnique({
@@ -109,7 +116,9 @@ export class CreateCheckpointService extends BaseService {
109116
});
110117

111118
const { reason } = params;
119+
112120
let checkpointEvent: CheckpointRestoreEvent | undefined;
121+
let keepRunAlive = false;
113122

114123
switch (reason.type) {
115124
case "WAIT_FOR_DURATION": {
@@ -125,7 +134,12 @@ export class CreateCheckpointService extends BaseService {
125134
dependencyFriendlyRunId: reason.friendlyId,
126135
});
127136

128-
await marqs?.acknowledgeMessage(attempt.taskRunId);
137+
keepRunAlive = await this.#isRunCompleted(reason.friendlyId);
138+
139+
if (!keepRunAlive) {
140+
await marqs?.acknowledgeMessage(attempt.taskRunId);
141+
}
142+
129143
break;
130144
}
131145
case "WAIT_FOR_BATCH": {
@@ -134,7 +148,12 @@ export class CreateCheckpointService extends BaseService {
134148
batchDependencyFriendlyId: reason.batchFriendlyId,
135149
});
136150

137-
await marqs?.acknowledgeMessage(attempt.taskRunId);
151+
keepRunAlive = await this.#isBatchCompleted(reason.batchFriendlyId);
152+
153+
if (!keepRunAlive) {
154+
await marqs?.acknowledgeMessage(attempt.taskRunId);
155+
}
156+
138157
break;
139158
}
140159
case "RETRYING_AFTER_FAILURE": {
@@ -174,6 +193,37 @@ export class CreateCheckpointService extends BaseService {
174193
return {
175194
checkpoint,
176195
event: checkpointEvent,
196+
keepRunAlive,
177197
};
178198
}
199+
200+
async #isBatchCompleted(friendlyId: string): Promise<boolean> {
201+
const batch = await this._prisma.batchTaskRun.findUnique({
202+
where: {
203+
friendlyId,
204+
},
205+
});
206+
207+
if (!batch) {
208+
logger.error("Batch not found", { friendlyId });
209+
return false;
210+
}
211+
212+
return batch.status === "COMPLETED";
213+
}
214+
215+
async #isRunCompleted(friendlyId: string): Promise<boolean> {
216+
const run = await this._prisma.taskRun.findUnique({
217+
where: {
218+
friendlyId,
219+
},
220+
});
221+
222+
if (!run) {
223+
logger.error("Run not found", { friendlyId });
224+
return false;
225+
}
226+
227+
return isFinalRunStatus(run.status);
228+
}
179229
}

packages/core/src/v3/schemas/messages.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,10 @@ export const CoordinatorToPlatformMessages = {
554554
}),
555555
]),
556556
}),
557+
callback: z.object({
558+
version: z.literal("v1").default("v1"),
559+
keepRunAlive: z.boolean(),
560+
}),
557561
},
558562
INDEXING_FAILED: {
559563
message: z.object({

0 commit comments

Comments
 (0)