Skip to content

Commit 36fea7d

Browse files
authored
Add support for manual checkpoints (#1709)
* don't crash run on lazy attempt errors and count on retry * remove abort controller after checkpoint in all cases * duration checkpoints * manual checkpoints * add busybox to coordinator image * add async toggle to process in background * remove early returns * add changeset for manual checkpoint schema
1 parent 3772334 commit 36fea7d

File tree

10 files changed

+407
-21
lines changed

10 files changed

+407
-21
lines changed

.changeset/wise-mirrors-hug.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Add manual checkpoint schema

apps/coordinator/Containerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ RUN find . -name "node_modules" -type d -prune -exec rm -rf '{}' +
1313
FROM node-20 AS base
1414

1515
RUN apt-get update \
16-
&& apt-get install -y buildah ca-certificates dumb-init docker.io \
16+
&& apt-get install -y buildah ca-certificates dumb-init docker.io busybox \
1717
&& rm -rf /var/lib/apt/lists/*
1818

1919
COPY --chown=node:node .gitignore .gitignore

apps/coordinator/src/checkpointer.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,14 @@ export class Checkpointer {
415415
const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });
416416
const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });
417417

418+
const removeCurrentAbortController = () => {
419+
// Ensure only the current controller is removed
420+
if (this.#abortControllers.get(runId) === controller) {
421+
this.#abortControllers.delete(runId);
422+
}
423+
controller.signal.removeEventListener("abort", onAbort);
424+
};
425+
418426
const cleanup = async () => {
419427
const metadata = {
420428
runId,
@@ -424,6 +432,7 @@ export class Checkpointer {
424432

425433
if (this.#dockerMode) {
426434
this.#logger.debug("Skipping cleanup in docker mode", metadata);
435+
removeCurrentAbortController();
427436
return;
428437
}
429438

@@ -436,11 +445,7 @@ export class Checkpointer {
436445
this.#logger.error("Error during cleanup", { ...metadata, error });
437446
}
438447

439-
// Ensure only the current controller is removed
440-
if (this.#abortControllers.get(runId) === controller) {
441-
this.#abortControllers.delete(runId);
442-
}
443-
controller.signal.removeEventListener("abort", onAbort);
448+
removeCurrentAbortController();
444449
};
445450

446451
try {

apps/coordinator/src/index.ts

Lines changed: 239 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
1414
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
1515
import { ChaosMonkey } from "./chaosMonkey";
1616
import { Checkpointer } from "./checkpointer";
17-
import { boolFromEnv, numFromEnv } from "./util";
17+
import { boolFromEnv, numFromEnv, safeJsonParse } from "./util";
1818

1919
import { collectDefaultMetrics, register, Gauge } from "prom-client";
2020
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
@@ -42,6 +42,8 @@ class CheckpointCancelError extends Error {}
4242

4343
class TaskCoordinator {
4444
#httpServer: ReturnType<typeof createServer>;
45+
#internalHttpServer: ReturnType<typeof createServer>;
46+
4547
#checkpointer = new Checkpointer({
4648
dockerMode: !process.env.KUBERNETES_PORT,
4749
forceSimulate: boolFromEnv("FORCE_CHECKPOINT_SIMULATION", false),
@@ -79,6 +81,8 @@ class TaskCoordinator {
7981
private host = "0.0.0.0"
8082
) {
8183
this.#httpServer = this.#createHttpServer();
84+
this.#internalHttpServer = this.#createInternalHttpServer();
85+
8286
this.#checkpointer.init();
8387
this.#platformSocket = this.#createPlatformSocket();
8488

@@ -653,11 +657,11 @@ class TaskCoordinator {
653657

654658
log.error("READY_FOR_LAZY_ATTEMPT error", { error });
655659

656-
await crashRun({
657-
name: "ReadyForLazyAttemptError",
658-
message:
659-
error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
660-
});
660+
// await crashRun({
661+
// name: "ReadyForLazyAttemptError",
662+
// message:
663+
// error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
664+
// });
661665

662666
return;
663667
}
@@ -1368,13 +1372,236 @@ class TaskCoordinator {
13681372
case "/metrics": {
13691373
return reply.text(await register.metrics(), 200, register.contentType);
13701374
}
1375+
default: {
1376+
return reply.empty(404);
1377+
}
1378+
}
1379+
});
1380+
1381+
httpServer.on("clientError", (err, socket) => {
1382+
socket.end("HTTP/1.1 400 Bad Request\r\n\r\n");
1383+
});
1384+
1385+
httpServer.on("listening", () => {
1386+
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
1387+
});
1388+
1389+
return httpServer;
1390+
}
1391+
1392+
#createInternalHttpServer() {
1393+
const httpServer = createServer(async (req, res) => {
1394+
logger.log(`[${req.method}]`, { url: req.url });
1395+
1396+
const reply = new HttpReply(res);
1397+
1398+
switch (req.url) {
13711399
case "/whoami": {
13721400
return reply.text(NODE_NAME);
13731401
}
1374-
case "/checkpoint": {
1375-
const body = await getTextBody(req);
1376-
// await this.#checkpointer.checkpointAndPush(body);
1377-
return reply.text(`sent restore request: ${body}`);
1402+
case "/checkpoint/duration": {
1403+
try {
1404+
const body = await getTextBody(req);
1405+
const json = safeJsonParse(body);
1406+
1407+
if (typeof json !== "object" || !json) {
1408+
return reply.text("Invalid body", 400);
1409+
}
1410+
1411+
if (!("runId" in json) || typeof json.runId !== "string") {
1412+
return reply.text("Missing or invalid: runId", 400);
1413+
}
1414+
1415+
if (!("now" in json) || typeof json.now !== "number") {
1416+
return reply.text("Missing or invalid: now", 400);
1417+
}
1418+
1419+
if (!("ms" in json) || typeof json.ms !== "number") {
1420+
return reply.text("Missing or invalid: ms", 400);
1421+
}
1422+
1423+
let keepRunAlive = false;
1424+
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
1425+
keepRunAlive = json.keepRunAlive;
1426+
}
1427+
1428+
let async = false;
1429+
if ("async" in json && typeof json.async === "boolean") {
1430+
async = json.async;
1431+
}
1432+
1433+
const { runId, now, ms } = json;
1434+
1435+
if (!runId) {
1436+
return reply.text("Missing runId", 400);
1437+
}
1438+
1439+
const runSocket = await this.#getRunSocket(runId);
1440+
if (!runSocket) {
1441+
return reply.text("Run socket not found", 404);
1442+
}
1443+
1444+
const { data } = runSocket;
1445+
1446+
console.log("Manual duration checkpoint", data);
1447+
1448+
if (async) {
1449+
reply.text("Creating checkpoint in the background", 202);
1450+
}
1451+
1452+
const checkpoint = await this.#checkpointer.checkpointAndPush({
1453+
runId: data.runId,
1454+
projectRef: data.projectRef,
1455+
deploymentVersion: data.deploymentVersion,
1456+
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
1457+
});
1458+
1459+
if (!checkpoint) {
1460+
return reply.text("Failed to checkpoint", 500);
1461+
}
1462+
1463+
if (!data.attemptFriendlyId) {
1464+
return reply.text("Socket data missing attemptFriendlyId", 500);
1465+
}
1466+
1467+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1468+
version: "v1",
1469+
runId,
1470+
attemptFriendlyId: data.attemptFriendlyId,
1471+
docker: checkpoint.docker,
1472+
location: checkpoint.location,
1473+
reason: {
1474+
type: "WAIT_FOR_DURATION",
1475+
ms,
1476+
now,
1477+
},
1478+
});
1479+
1480+
if (ack?.keepRunAlive || keepRunAlive) {
1481+
return reply.json({
1482+
message: `keeping run ${runId} alive after checkpoint`,
1483+
checkpoint,
1484+
requestJson: json,
1485+
platformAck: ack,
1486+
});
1487+
}
1488+
1489+
runSocket.emit("REQUEST_EXIT", {
1490+
version: "v1",
1491+
});
1492+
1493+
return reply.json({
1494+
message: `checkpoint created for run ${runId}`,
1495+
checkpoint,
1496+
requestJson: json,
1497+
platformAck: ack,
1498+
});
1499+
} catch (error) {
1500+
return reply.json({
1501+
message: `error`,
1502+
error,
1503+
});
1504+
}
1505+
}
1506+
case "/checkpoint/manual": {
1507+
try {
1508+
const body = await getTextBody(req);
1509+
const json = safeJsonParse(body);
1510+
1511+
if (typeof json !== "object" || !json) {
1512+
return reply.text("Invalid body", 400);
1513+
}
1514+
1515+
if (!("runId" in json) || typeof json.runId !== "string") {
1516+
return reply.text("Missing or invalid: runId", 400);
1517+
}
1518+
1519+
let restoreAtUnixTimeMs: number | undefined;
1520+
if ("restoreAtUnixTimeMs" in json && typeof json.restoreAtUnixTimeMs === "number") {
1521+
restoreAtUnixTimeMs = json.restoreAtUnixTimeMs;
1522+
}
1523+
1524+
let keepRunAlive = false;
1525+
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
1526+
keepRunAlive = json.keepRunAlive;
1527+
}
1528+
1529+
let async = false;
1530+
if ("async" in json && typeof json.async === "boolean") {
1531+
async = json.async;
1532+
}
1533+
1534+
const { runId } = json;
1535+
1536+
if (!runId) {
1537+
return reply.text("Missing runId", 400);
1538+
}
1539+
1540+
const runSocket = await this.#getRunSocket(runId);
1541+
if (!runSocket) {
1542+
return reply.text("Run socket not found", 404);
1543+
}
1544+
1545+
const { data } = runSocket;
1546+
1547+
console.log("Manual checkpoint", data);
1548+
1549+
if (async) {
1550+
reply.text("Creating checkpoint in the background", 202);
1551+
}
1552+
1553+
const checkpoint = await this.#checkpointer.checkpointAndPush({
1554+
runId: data.runId,
1555+
projectRef: data.projectRef,
1556+
deploymentVersion: data.deploymentVersion,
1557+
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
1558+
});
1559+
1560+
if (!checkpoint) {
1561+
return reply.text("Failed to checkpoint", 500);
1562+
}
1563+
1564+
if (!data.attemptFriendlyId) {
1565+
return reply.text("Socket data missing attemptFriendlyId", 500);
1566+
}
1567+
1568+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1569+
version: "v1",
1570+
runId,
1571+
attemptFriendlyId: data.attemptFriendlyId,
1572+
docker: checkpoint.docker,
1573+
location: checkpoint.location,
1574+
reason: {
1575+
type: "MANUAL",
1576+
restoreAtUnixTimeMs,
1577+
},
1578+
});
1579+
1580+
if (ack?.keepRunAlive || keepRunAlive) {
1581+
return reply.json({
1582+
message: `keeping run ${runId} alive after checkpoint`,
1583+
checkpoint,
1584+
requestJson: json,
1585+
platformAck: ack,
1586+
});
1587+
}
1588+
1589+
runSocket.emit("REQUEST_EXIT", {
1590+
version: "v1",
1591+
});
1592+
1593+
return reply.json({
1594+
message: `checkpoint created for run ${runId}`,
1595+
checkpoint,
1596+
requestJson: json,
1597+
platformAck: ack,
1598+
});
1599+
} catch (error) {
1600+
return reply.json({
1601+
message: `error`,
1602+
error,
1603+
});
1604+
}
13781605
}
13791606
default: {
13801607
return reply.empty(404);
@@ -1387,14 +1614,15 @@ class TaskCoordinator {
13871614
});
13881615

13891616
httpServer.on("listening", () => {
1390-
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
1617+
logger.log("internal server listening on port", { port: HTTP_SERVER_PORT + 100 });
13911618
});
13921619

13931620
return httpServer;
13941621
}
13951622

13961623
listen() {
13971624
this.#httpServer.listen(this.port, this.host);
1625+
this.#internalHttpServer.listen(this.port + 100, "127.0.0.1");
13981626
}
13991627
}
14001628

apps/coordinator/src/util.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,15 @@ export const numFromEnv = (env: string, defaultValue: number): number => {
1717

1818
return parseInt(value, 10);
1919
};
20+
21+
export function safeJsonParse(json?: string): unknown {
22+
if (!json) {
23+
return;
24+
}
25+
26+
try {
27+
return JSON.parse(json);
28+
} catch (e) {
29+
return null;
30+
}
31+
}

apps/webapp/app/database-types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,11 @@ export const RuntimeEnvironmentType = {
6969
DEVELOPMENT: "DEVELOPMENT",
7070
PREVIEW: "PREVIEW",
7171
} as const satisfies Record<RuntimeEnvironmentTypeType, RuntimeEnvironmentTypeType>;
72+
73+
export function isTaskRunAttemptStatus(value: string): value is keyof typeof TaskRunAttemptStatus {
74+
return Object.values(TaskRunAttemptStatus).includes(value as keyof typeof TaskRunAttemptStatus);
75+
}
76+
77+
export function isTaskRunStatus(value: string): value is keyof typeof TaskRunStatus {
78+
return Object.values(TaskRunStatus).includes(value as keyof typeof TaskRunStatus);
79+
}

0 commit comments

Comments
 (0)