Skip to content

v3: automatic pod cleanup #1092

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
import { randomUUID } from "crypto";
import { TaskMonitor } from "./taskMonitor";
import { PodCleaner } from "./podCleaner";

const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
const NODE_NAME = process.env.NODE_NAME || "local";
Expand Down Expand Up @@ -543,3 +544,11 @@ const taskMonitor = new TaskMonitor({
});

taskMonitor.start();

const podCleaner = new PodCleaner({
runtimeEnv: RUNTIME_ENV,
namespace: "default",
intervalInSeconds: 300,
});

podCleaner.start();
264 changes: 264 additions & 0 deletions apps/kubernetes-provider/src/podCleaner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
import * as k8s from "@kubernetes/client-node";
import { SimpleLogger } from "@trigger.dev/core-apps";

type PodCleanerOptions = {
runtimeEnv: "local" | "kubernetes";
namespace?: string;
intervalInSeconds?: number;
};

export class PodCleaner {
private enabled = false;
private namespace = "default";
private intervalInSeconds = 300;

private logger = new SimpleLogger("[PodCleaner]");
private k8sClient: {
core: k8s.CoreV1Api;
kubeConfig: k8s.KubeConfig;
};

constructor(private opts: PodCleanerOptions) {
if (opts.namespace) {
this.namespace = opts.namespace;
}

if (opts.intervalInSeconds) {
this.intervalInSeconds = opts.intervalInSeconds;
}

this.k8sClient = this.#createK8sClient();
}

#createK8sClient() {
const kubeConfig = new k8s.KubeConfig();

if (this.opts.runtimeEnv === "local") {
kubeConfig.loadFromDefault();
} else if (this.opts.runtimeEnv === "kubernetes") {
kubeConfig.loadFromCluster();
} else {
throw new Error(`Unsupported runtime environment: ${this.opts.runtimeEnv}`);
}

return {
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
kubeConfig: kubeConfig,
};
}

#isRecord(candidate: unknown): candidate is Record<string, unknown> {
if (typeof candidate !== "object" || candidate === null) {
return false;
} else {
return true;
}
}

#logK8sError(err: unknown, debugOnly = false) {
if (debugOnly) {
this.logger.debug("K8s API Error", err);
} else {
this.logger.error("K8s API Error", err);
}
}

#handleK8sError(err: unknown) {
if (!this.#isRecord(err) || !this.#isRecord(err.body)) {
this.#logK8sError(err);
return;
}

this.#logK8sError(err, true);

if (typeof err.body.message === "string") {
this.#logK8sError({ message: err.body.message });
return;
}

this.#logK8sError({ body: err.body });
}

async #deletePods(opts: {
namespace: string;
dryRun?: boolean;
fieldSelector?: string;
labelSelector?: string;
}) {
return await this.k8sClient.core
.deleteCollectionNamespacedPod(
opts.namespace,
undefined, // pretty
undefined, // continue
opts.dryRun ? "All" : undefined,
opts.fieldSelector,
undefined, // gracePeriodSeconds
opts.labelSelector
)
.catch(this.#handleK8sError.bind(this));
}

async #deleteCompletedRuns() {
this.logger.log("Deleting completed runs");

const start = Date.now();

const result = await this.#deletePods({
namespace: this.namespace,
fieldSelector: "status.phase=Succeeded",
labelSelector: "app=task-run",
});

const elapsedMs = Date.now() - start;

if (!result) {
this.logger.log("Deleting completed runs: No delete result", { elapsedMs });
return;
}

const total = (result.response as any)?.body?.items?.length ?? 0;

this.logger.log("Deleting completed runs: Done", { total, elapsedMs });
}

async #deleteFailedRuns() {
this.logger.log("Deleting failed runs");

const start = Date.now();

const result = await this.#deletePods({
namespace: this.namespace,
fieldSelector: "status.phase=Failed",
labelSelector: "app=task-run",
});

const elapsedMs = Date.now() - start;

if (!result) {
this.logger.log("Deleting failed runs: No delete result", { elapsedMs });
return;
}

const total = (result.response as any)?.body?.items?.length ?? 0;

this.logger.log("Deleting failed runs: Done", { total, elapsedMs });
}

async #deleteUnrecoverableRuns() {
await this.#deletePods({
namespace: this.namespace,
fieldSelector: "status.phase=?",
labelSelector: "app=task-run",
});
}

async start() {
this.enabled = true;
this.logger.log("Starting");

const completedInterval = setInterval(async () => {
if (!this.enabled) {
clearInterval(completedInterval);
return;
}

try {
await this.#deleteCompletedRuns();
} catch (error) {
this.logger.error("Error deleting completed runs", error);
}
}, this.intervalInSeconds * 1000);

const failedInterval = setInterval(
async () => {
if (!this.enabled) {
clearInterval(failedInterval);
return;
}

try {
await this.#deleteFailedRuns();
} catch (error) {
this.logger.error("Error deleting completed runs", error);
}
},
// Use a longer interval for failed runs. This is only a backup in case the task monitor fails.
2 * this.intervalInSeconds * 1000
);

// this.#launchTests();
}

async stop() {
if (!this.enabled) {
return;
}

this.enabled = false;
this.logger.log("Shutting down..");
}

async #launchTests() {
const createPod = async (
container: k8s.V1Container,
name: string,
labels?: Record<string, string>
) => {
this.logger.log("Creating pod:", name);

const pod = {
metadata: {
name,
labels,
},
spec: {
restartPolicy: "Never",
automountServiceAccountToken: false,
terminationGracePeriodSeconds: 1,
containers: [container],
},
} satisfies k8s.V1Pod;

await this.k8sClient.core
.createNamespacedPod(this.namespace, pod)
.catch(this.#handleK8sError.bind(this));
};

const createIdlePod = async (name: string, labels?: Record<string, string>) => {
const container = {
name,
image: "docker.io/library/busybox",
command: ["sh"],
args: ["-c", "sleep infinity"],
} satisfies k8s.V1Container;

await createPod(container, name, labels);
};

const createCompletedPod = async (name: string, labels?: Record<string, string>) => {
const container = {
name,
image: "docker.io/library/busybox",
command: ["sh"],
args: ["-c", "true"],
} satisfies k8s.V1Container;

await createPod(container, name, labels);
};

const createFailedPod = async (name: string, labels?: Record<string, string>) => {
const container = {
name,
image: "docker.io/library/busybox",
command: ["sh"],
args: ["-c", "false"],
} satisfies k8s.V1Container;

await createPod(container, name, labels);
};

await createIdlePod("test-idle-1", { app: "task-run" });
await createFailedPod("test-failed-1", { app: "task-run" });
await createCompletedPod("test-completed-1", { app: "task-run" });
}
}
6 changes: 6 additions & 0 deletions apps/kubernetes-provider/src/taskMonitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type TaskMonitorOptions = {

export class TaskMonitor {
#enabled = false;

#logger = new SimpleLogger("[TaskMonitor]");
#taskInformer: ReturnType<typeof k8s.makeInformer<k8s.V1Pod>>;
#processedPods = new Map<string, number>();
#queue = new PQueue({ concurrency: 10 });

#k8sClient: {
core: k8s.CoreV1Api;
kubeConfig: k8s.KubeConfig;
Expand All @@ -44,6 +46,10 @@ export class TaskMonitor {
private labelSelector = "app in (task-index, task-run)";

constructor(private opts: TaskMonitorOptions) {
if (opts.namespace) {
this.namespace = opts.namespace;
}

this.#k8sClient = this.#createK8sClient();

this.#taskInformer = this.#createTaskInformer();
Expand Down
Loading