Skip to content

Commit 899ca3f

Browse files
committed
duration checkpoints
1 parent 4a0976d commit 899ca3f

File tree

2 files changed

+135
-6
lines changed

2 files changed

+135
-6
lines changed

apps/coordinator/src/index.ts

Lines changed: 123 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
1414
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
1515
import { ChaosMonkey } from "./chaosMonkey";
1616
import { Checkpointer } from "./checkpointer";
17-
import { boolFromEnv, numFromEnv } from "./util";
17+
import { boolFromEnv, numFromEnv, safeJsonParse } from "./util";
1818

1919
import { collectDefaultMetrics, register, Gauge } from "prom-client";
2020
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
@@ -42,6 +42,8 @@ class CheckpointCancelError extends Error {}
4242

4343
class TaskCoordinator {
4444
#httpServer: ReturnType<typeof createServer>;
45+
#internalHttpServer: ReturnType<typeof createServer>;
46+
4547
#checkpointer = new Checkpointer({
4648
dockerMode: !process.env.KUBERNETES_PORT,
4749
forceSimulate: boolFromEnv("FORCE_CHECKPOINT_SIMULATION", false),
@@ -79,6 +81,8 @@ class TaskCoordinator {
7981
private host = "0.0.0.0"
8082
) {
8183
this.#httpServer = this.#createHttpServer();
84+
this.#internalHttpServer = this.#createInternalHttpServer();
85+
8286
this.#checkpointer.init();
8387
this.#platformSocket = this.#createPlatformSocket();
8488

@@ -1368,13 +1372,125 @@ class TaskCoordinator {
13681372
case "/metrics": {
13691373
return reply.text(await register.metrics(), 200, register.contentType);
13701374
}
1375+
default: {
1376+
return reply.empty(404);
1377+
}
1378+
}
1379+
});
1380+
1381+
httpServer.on("clientError", (err, socket) => {
1382+
socket.end("HTTP/1.1 400 Bad Request\r\n\r\n");
1383+
});
1384+
1385+
httpServer.on("listening", () => {
1386+
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
1387+
});
1388+
1389+
return httpServer;
1390+
}
1391+
1392+
#createInternalHttpServer() {
1393+
const httpServer = createServer(async (req, res) => {
1394+
logger.log(`[${req.method}]`, { url: req.url });
1395+
1396+
const reply = new HttpReply(res);
1397+
1398+
switch (req.url) {
13711399
case "/whoami": {
13721400
return reply.text(NODE_NAME);
13731401
}
1374-
case "/checkpoint": {
1375-
const body = await getTextBody(req);
1376-
// await this.#checkpointer.checkpointAndPush(body);
1377-
return reply.text(`sent restore request: ${body}`);
1402+
case "/checkpoint/duration": {
1403+
try {
1404+
const body = await getTextBody(req);
1405+
const json = safeJsonParse(body);
1406+
1407+
if (typeof json !== "object" || !json) {
1408+
return reply.text("Invalid body", 400);
1409+
}
1410+
1411+
if (!("runId" in json) || typeof json.runId !== "string") {
1412+
return reply.text("Missing or invalid: runId", 400);
1413+
}
1414+
1415+
if (!("now" in json) || typeof json.now !== "number") {
1416+
return reply.text("Missing or invalid: now", 400);
1417+
}
1418+
1419+
if (!("ms" in json) || typeof json.ms !== "number") {
1420+
return reply.text("Missing or invalid: ms", 400);
1421+
}
1422+
1423+
let keepRunAlive = false;
1424+
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
1425+
keepRunAlive = json.keepRunAlive;
1426+
}
1427+
1428+
const { runId, now, ms } = json;
1429+
1430+
if (!runId) {
1431+
return reply.text("Missing runId", 400);
1432+
}
1433+
1434+
const runSocket = await this.#getRunSocket(runId);
1435+
if (!runSocket) {
1436+
return reply.text("Run socket not found", 404);
1437+
}
1438+
1439+
const { data } = runSocket;
1440+
1441+
console.log("Manual duration checkpoint", data);
1442+
1443+
const checkpoint = await this.#checkpointer.checkpointAndPush({
1444+
runId: data.runId,
1445+
projectRef: data.projectRef,
1446+
deploymentVersion: data.deploymentVersion,
1447+
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
1448+
});
1449+
1450+
if (!checkpoint) {
1451+
return reply.text("Failed to checkpoint", 500);
1452+
}
1453+
1454+
if (!data.attemptFriendlyId) {
1455+
return reply.text("Socket data missing attemptFriendlyId", 500);
1456+
}
1457+
1458+
const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
1459+
version: "v1",
1460+
runId,
1461+
attemptFriendlyId: data.attemptFriendlyId,
1462+
docker: checkpoint.docker,
1463+
location: checkpoint.location,
1464+
reason: {
1465+
type: "WAIT_FOR_DURATION",
1466+
ms,
1467+
now,
1468+
},
1469+
});
1470+
1471+
if (ack?.keepRunAlive || keepRunAlive) {
1472+
return reply.json({
1473+
message: `keeping run ${runId} alive after checkpoint`,
1474+
checkpoint,
1475+
requestJson: json,
1476+
});
1477+
}
1478+
1479+
runSocket.emit("REQUEST_EXIT", {
1480+
version: "v1",
1481+
});
1482+
1483+
return reply.json({
1484+
message: `checkpoint created for run ${runId}`,
1485+
checkpoint,
1486+
requestJson: json,
1487+
});
1488+
} catch (error) {
1489+
return reply.json({
1490+
message: `error`,
1491+
error,
1492+
});
1493+
}
13781494
}
13791495
default: {
13801496
return reply.empty(404);
@@ -1387,14 +1503,15 @@ class TaskCoordinator {
13871503
});
13881504

13891505
httpServer.on("listening", () => {
1390-
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
1506+
logger.log("internal server listening on port", { port: HTTP_SERVER_PORT + 100 });
13911507
});
13921508

13931509
return httpServer;
13941510
}
13951511

13961512
listen() {
13971513
this.#httpServer.listen(this.port, this.host);
1514+
this.#internalHttpServer.listen(this.port + 100, "127.0.0.1");
13981515
}
13991516
}
14001517

apps/coordinator/src/util.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,15 @@ export const numFromEnv = (env: string, defaultValue: number): number => {
1717

1818
return parseInt(value, 10);
1919
};
20+
21+
export function safeJsonParse(json?: string): unknown {
22+
if (!json) {
23+
return;
24+
}
25+
26+
try {
27+
return JSON.parse(json);
28+
} catch (e) {
29+
return null;
30+
}
31+
}

0 commit comments

Comments
 (0)