Skip to content

Add support for manual checkpoints #1709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 14, 2025
5 changes: 5 additions & 0 deletions .changeset/wise-mirrors-hug.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add manual checkpoint schema
2 changes: 1 addition & 1 deletion apps/coordinator/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ RUN find . -name "node_modules" -type d -prune -exec rm -rf '{}' +
FROM node-20 AS base

RUN apt-get update \
&& apt-get install -y buildah ca-certificates dumb-init docker.io \
&& apt-get install -y buildah ca-certificates dumb-init docker.io busybox \
&& rm -rf /var/lib/apt/lists/*

COPY --chown=node:node .gitignore .gitignore
Expand Down
15 changes: 10 additions & 5 deletions apps/coordinator/src/checkpointer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,14 @@ export class Checkpointer {
const buildah = new Buildah({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });
const crictl = new Crictl({ id: `${runId}-${shortCode}`, abortSignal: controller.signal });

const removeCurrentAbortController = () => {
// Ensure only the current controller is removed
if (this.#abortControllers.get(runId) === controller) {
this.#abortControllers.delete(runId);
}
controller.signal.removeEventListener("abort", onAbort);
};

const cleanup = async () => {
const metadata = {
runId,
Expand All @@ -424,6 +432,7 @@ export class Checkpointer {

if (this.#dockerMode) {
this.#logger.debug("Skipping cleanup in docker mode", metadata);
removeCurrentAbortController();
return;
}

Expand All @@ -436,11 +445,7 @@ export class Checkpointer {
this.#logger.error("Error during cleanup", { ...metadata, error });
}

// Ensure only the current controller is removed
if (this.#abortControllers.get(runId) === controller) {
this.#abortControllers.delete(runId);
}
controller.signal.removeEventListener("abort", onAbort);
removeCurrentAbortController();
};

try {
Expand Down
250 changes: 239 additions & 11 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { ZodSocketConnection } from "@trigger.dev/core/v3/zodSocket";
import { HttpReply, getTextBody } from "@trigger.dev/core/v3/apps";
import { ChaosMonkey } from "./chaosMonkey";
import { Checkpointer } from "./checkpointer";
import { boolFromEnv, numFromEnv } from "./util";
import { boolFromEnv, numFromEnv, safeJsonParse } from "./util";

import { collectDefaultMetrics, register, Gauge } from "prom-client";
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
Expand Down Expand Up @@ -42,6 +42,8 @@ class CheckpointCancelError extends Error {}

class TaskCoordinator {
#httpServer: ReturnType<typeof createServer>;
#internalHttpServer: ReturnType<typeof createServer>;

#checkpointer = new Checkpointer({
dockerMode: !process.env.KUBERNETES_PORT,
forceSimulate: boolFromEnv("FORCE_CHECKPOINT_SIMULATION", false),
Expand Down Expand Up @@ -79,6 +81,8 @@ class TaskCoordinator {
private host = "0.0.0.0"
) {
this.#httpServer = this.#createHttpServer();
this.#internalHttpServer = this.#createInternalHttpServer();

this.#checkpointer.init();
this.#platformSocket = this.#createPlatformSocket();

Expand Down Expand Up @@ -653,11 +657,11 @@ class TaskCoordinator {

log.error("READY_FOR_LAZY_ATTEMPT error", { error });

await crashRun({
name: "ReadyForLazyAttemptError",
message:
error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
});
// await crashRun({
// name: "ReadyForLazyAttemptError",
// message:
// error instanceof Error ? `Unexpected error: ${error.message}` : "Unexpected error",
// });

return;
}
Expand Down Expand Up @@ -1368,13 +1372,236 @@ class TaskCoordinator {
case "/metrics": {
return reply.text(await register.metrics(), 200, register.contentType);
}
default: {
return reply.empty(404);
}
}
});

httpServer.on("clientError", (err, socket) => {
socket.end("HTTP/1.1 400 Bad Request\r\n\r\n");
});

httpServer.on("listening", () => {
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
});

return httpServer;
}

#createInternalHttpServer() {
const httpServer = createServer(async (req, res) => {
logger.log(`[${req.method}]`, { url: req.url });

const reply = new HttpReply(res);

switch (req.url) {
case "/whoami": {
return reply.text(NODE_NAME);
}
case "/checkpoint": {
const body = await getTextBody(req);
// await this.#checkpointer.checkpointAndPush(body);
return reply.text(`sent restore request: ${body}`);
case "/checkpoint/duration": {
try {
const body = await getTextBody(req);
const json = safeJsonParse(body);

if (typeof json !== "object" || !json) {
return reply.text("Invalid body", 400);
}

if (!("runId" in json) || typeof json.runId !== "string") {
return reply.text("Missing or invalid: runId", 400);
}

if (!("now" in json) || typeof json.now !== "number") {
return reply.text("Missing or invalid: now", 400);
}

if (!("ms" in json) || typeof json.ms !== "number") {
return reply.text("Missing or invalid: ms", 400);
}

let keepRunAlive = false;
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
keepRunAlive = json.keepRunAlive;
}

let async = false;
if ("async" in json && typeof json.async === "boolean") {
async = json.async;
}

const { runId, now, ms } = json;

if (!runId) {
return reply.text("Missing runId", 400);
}

const runSocket = await this.#getRunSocket(runId);
if (!runSocket) {
return reply.text("Run socket not found", 404);
}

const { data } = runSocket;

console.log("Manual duration checkpoint", data);

if (async) {
reply.text("Creating checkpoint in the background", 202);
}

const checkpoint = await this.#checkpointer.checkpointAndPush({
runId: data.runId,
projectRef: data.projectRef,
deploymentVersion: data.deploymentVersion,
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
});

if (!checkpoint) {
return reply.text("Failed to checkpoint", 500);
}

if (!data.attemptFriendlyId) {
return reply.text("Socket data missing attemptFriendlyId", 500);
}

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
runId,
attemptFriendlyId: data.attemptFriendlyId,
docker: checkpoint.docker,
location: checkpoint.location,
reason: {
type: "WAIT_FOR_DURATION",
ms,
now,
},
});

if (ack?.keepRunAlive || keepRunAlive) {
return reply.json({
message: `keeping run ${runId} alive after checkpoint`,
checkpoint,
requestJson: json,
platformAck: ack,
});
}

runSocket.emit("REQUEST_EXIT", {
version: "v1",
});

return reply.json({
message: `checkpoint created for run ${runId}`,
checkpoint,
requestJson: json,
platformAck: ack,
});
} catch (error) {
return reply.json({
message: `error`,
error,
});
}
}
case "/checkpoint/manual": {
try {
const body = await getTextBody(req);
const json = safeJsonParse(body);

if (typeof json !== "object" || !json) {
return reply.text("Invalid body", 400);
}

if (!("runId" in json) || typeof json.runId !== "string") {
return reply.text("Missing or invalid: runId", 400);
}

let restoreAtUnixTimeMs: number | undefined;
if ("restoreAtUnixTimeMs" in json && typeof json.restoreAtUnixTimeMs === "number") {
restoreAtUnixTimeMs = json.restoreAtUnixTimeMs;
}

let keepRunAlive = false;
if ("keepRunAlive" in json && typeof json.keepRunAlive === "boolean") {
keepRunAlive = json.keepRunAlive;
}

let async = false;
if ("async" in json && typeof json.async === "boolean") {
async = json.async;
}

const { runId } = json;

if (!runId) {
return reply.text("Missing runId", 400);
}

const runSocket = await this.#getRunSocket(runId);
if (!runSocket) {
return reply.text("Run socket not found", 404);
}

const { data } = runSocket;

console.log("Manual checkpoint", data);

if (async) {
reply.text("Creating checkpoint in the background", 202);
}

const checkpoint = await this.#checkpointer.checkpointAndPush({
runId: data.runId,
projectRef: data.projectRef,
deploymentVersion: data.deploymentVersion,
attemptNumber: data.attemptNumber ? parseInt(data.attemptNumber) : undefined,
});

if (!checkpoint) {
return reply.text("Failed to checkpoint", 500);
}

if (!data.attemptFriendlyId) {
return reply.text("Socket data missing attemptFriendlyId", 500);
}

const ack = await this.#platformSocket?.sendWithAck("CHECKPOINT_CREATED", {
version: "v1",
runId,
attemptFriendlyId: data.attemptFriendlyId,
docker: checkpoint.docker,
location: checkpoint.location,
reason: {
type: "MANUAL",
restoreAtUnixTimeMs,
},
});

if (ack?.keepRunAlive || keepRunAlive) {
return reply.json({
message: `keeping run ${runId} alive after checkpoint`,
checkpoint,
requestJson: json,
platformAck: ack,
});
}

runSocket.emit("REQUEST_EXIT", {
version: "v1",
});

return reply.json({
message: `checkpoint created for run ${runId}`,
checkpoint,
requestJson: json,
platformAck: ack,
});
} catch (error) {
return reply.json({
message: `error`,
error,
});
}
}
default: {
return reply.empty(404);
Expand All @@ -1387,14 +1614,15 @@ class TaskCoordinator {
});

httpServer.on("listening", () => {
logger.log("server listening on port", { port: HTTP_SERVER_PORT });
logger.log("internal server listening on port", { port: HTTP_SERVER_PORT + 100 });
});

return httpServer;
}

listen() {
this.#httpServer.listen(this.port, this.host);
this.#internalHttpServer.listen(this.port + 100, "127.0.0.1");
}
}

Expand Down
12 changes: 12 additions & 0 deletions apps/coordinator/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,15 @@ export const numFromEnv = (env: string, defaultValue: number): number => {

return parseInt(value, 10);
};

export function safeJsonParse(json?: string): unknown {
if (!json) {
return;
}

try {
return JSON.parse(json);
} catch (e) {
return null;
}
}
8 changes: 8 additions & 0 deletions apps/webapp/app/database-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,11 @@ export const RuntimeEnvironmentType = {
DEVELOPMENT: "DEVELOPMENT",
PREVIEW: "PREVIEW",
} as const satisfies Record<RuntimeEnvironmentTypeType, RuntimeEnvironmentTypeType>;

export function isTaskRunAttemptStatus(value: string): value is keyof typeof TaskRunAttemptStatus {
return Object.values(TaskRunAttemptStatus).includes(value as keyof typeof TaskRunAttemptStatus);
}

export function isTaskRunStatus(value: string): value is keyof typeof TaskRunStatus {
return Object.values(TaskRunStatus).includes(value as keyof typeof TaskRunStatus);
}
Loading
Loading