Skip to content

Commit 2306217

Browse files
authored
v3: automatic pod cleanup (#1092)
* add automatic pod cleaner * fix task monitor namespace override
1 parent cfe4b54 commit 2306217

File tree

3 files changed

+279
-0
lines changed

3 files changed

+279
-0
lines changed

apps/kubernetes-provider/src/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
1111
import { randomUUID } from "crypto";
1212
import { TaskMonitor } from "./taskMonitor";
13+
import { PodCleaner } from "./podCleaner";
1314

1415
const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
1516
const NODE_NAME = process.env.NODE_NAME || "local";
@@ -543,3 +544,11 @@ const taskMonitor = new TaskMonitor({
543544
});
544545

545546
taskMonitor.start();
547+
548+
const podCleaner = new PodCleaner({
549+
runtimeEnv: RUNTIME_ENV,
550+
namespace: "default",
551+
intervalInSeconds: 300,
552+
});
553+
554+
podCleaner.start();
Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
import * as k8s from "@kubernetes/client-node";
2+
import { SimpleLogger } from "@trigger.dev/core-apps";
3+
4+
type PodCleanerOptions = {
5+
runtimeEnv: "local" | "kubernetes";
6+
namespace?: string;
7+
intervalInSeconds?: number;
8+
};
9+
10+
export class PodCleaner {
11+
private enabled = false;
12+
private namespace = "default";
13+
private intervalInSeconds = 300;
14+
15+
private logger = new SimpleLogger("[PodCleaner]");
16+
private k8sClient: {
17+
core: k8s.CoreV1Api;
18+
kubeConfig: k8s.KubeConfig;
19+
};
20+
21+
constructor(private opts: PodCleanerOptions) {
22+
if (opts.namespace) {
23+
this.namespace = opts.namespace;
24+
}
25+
26+
if (opts.intervalInSeconds) {
27+
this.intervalInSeconds = opts.intervalInSeconds;
28+
}
29+
30+
this.k8sClient = this.#createK8sClient();
31+
}
32+
33+
#createK8sClient() {
34+
const kubeConfig = new k8s.KubeConfig();
35+
36+
if (this.opts.runtimeEnv === "local") {
37+
kubeConfig.loadFromDefault();
38+
} else if (this.opts.runtimeEnv === "kubernetes") {
39+
kubeConfig.loadFromCluster();
40+
} else {
41+
throw new Error(`Unsupported runtime environment: ${this.opts.runtimeEnv}`);
42+
}
43+
44+
return {
45+
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
46+
kubeConfig: kubeConfig,
47+
};
48+
}
49+
50+
#isRecord(candidate: unknown): candidate is Record<string, unknown> {
51+
if (typeof candidate !== "object" || candidate === null) {
52+
return false;
53+
} else {
54+
return true;
55+
}
56+
}
57+
58+
#logK8sError(err: unknown, debugOnly = false) {
59+
if (debugOnly) {
60+
this.logger.debug("K8s API Error", err);
61+
} else {
62+
this.logger.error("K8s API Error", err);
63+
}
64+
}
65+
66+
#handleK8sError(err: unknown) {
67+
if (!this.#isRecord(err) || !this.#isRecord(err.body)) {
68+
this.#logK8sError(err);
69+
return;
70+
}
71+
72+
this.#logK8sError(err, true);
73+
74+
if (typeof err.body.message === "string") {
75+
this.#logK8sError({ message: err.body.message });
76+
return;
77+
}
78+
79+
this.#logK8sError({ body: err.body });
80+
}
81+
82+
async #deletePods(opts: {
83+
namespace: string;
84+
dryRun?: boolean;
85+
fieldSelector?: string;
86+
labelSelector?: string;
87+
}) {
88+
return await this.k8sClient.core
89+
.deleteCollectionNamespacedPod(
90+
opts.namespace,
91+
undefined, // pretty
92+
undefined, // continue
93+
opts.dryRun ? "All" : undefined,
94+
opts.fieldSelector,
95+
undefined, // gracePeriodSeconds
96+
opts.labelSelector
97+
)
98+
.catch(this.#handleK8sError.bind(this));
99+
}
100+
101+
async #deleteCompletedRuns() {
102+
this.logger.log("Deleting completed runs");
103+
104+
const start = Date.now();
105+
106+
const result = await this.#deletePods({
107+
namespace: this.namespace,
108+
fieldSelector: "status.phase=Succeeded",
109+
labelSelector: "app=task-run",
110+
});
111+
112+
const elapsedMs = Date.now() - start;
113+
114+
if (!result) {
115+
this.logger.log("Deleting completed runs: No delete result", { elapsedMs });
116+
return;
117+
}
118+
119+
const total = (result.response as any)?.body?.items?.length ?? 0;
120+
121+
this.logger.log("Deleting completed runs: Done", { total, elapsedMs });
122+
}
123+
124+
async #deleteFailedRuns() {
125+
this.logger.log("Deleting failed runs");
126+
127+
const start = Date.now();
128+
129+
const result = await this.#deletePods({
130+
namespace: this.namespace,
131+
fieldSelector: "status.phase=Failed",
132+
labelSelector: "app=task-run",
133+
});
134+
135+
const elapsedMs = Date.now() - start;
136+
137+
if (!result) {
138+
this.logger.log("Deleting failed runs: No delete result", { elapsedMs });
139+
return;
140+
}
141+
142+
const total = (result.response as any)?.body?.items?.length ?? 0;
143+
144+
this.logger.log("Deleting failed runs: Done", { total, elapsedMs });
145+
}
146+
147+
async #deleteUnrecoverableRuns() {
148+
await this.#deletePods({
149+
namespace: this.namespace,
150+
fieldSelector: "status.phase=?",
151+
labelSelector: "app=task-run",
152+
});
153+
}
154+
155+
async start() {
156+
this.enabled = true;
157+
this.logger.log("Starting");
158+
159+
const completedInterval = setInterval(async () => {
160+
if (!this.enabled) {
161+
clearInterval(completedInterval);
162+
return;
163+
}
164+
165+
try {
166+
await this.#deleteCompletedRuns();
167+
} catch (error) {
168+
this.logger.error("Error deleting completed runs", error);
169+
}
170+
}, this.intervalInSeconds * 1000);
171+
172+
const failedInterval = setInterval(
173+
async () => {
174+
if (!this.enabled) {
175+
clearInterval(failedInterval);
176+
return;
177+
}
178+
179+
try {
180+
await this.#deleteFailedRuns();
181+
} catch (error) {
182+
this.logger.error("Error deleting completed runs", error);
183+
}
184+
},
185+
// Use a longer interval for failed runs. This is only a backup in case the task monitor fails.
186+
2 * this.intervalInSeconds * 1000
187+
);
188+
189+
// this.#launchTests();
190+
}
191+
192+
async stop() {
193+
if (!this.enabled) {
194+
return;
195+
}
196+
197+
this.enabled = false;
198+
this.logger.log("Shutting down..");
199+
}
200+
201+
async #launchTests() {
202+
const createPod = async (
203+
container: k8s.V1Container,
204+
name: string,
205+
labels?: Record<string, string>
206+
) => {
207+
this.logger.log("Creating pod:", name);
208+
209+
const pod = {
210+
metadata: {
211+
name,
212+
labels,
213+
},
214+
spec: {
215+
restartPolicy: "Never",
216+
automountServiceAccountToken: false,
217+
terminationGracePeriodSeconds: 1,
218+
containers: [container],
219+
},
220+
} satisfies k8s.V1Pod;
221+
222+
await this.k8sClient.core
223+
.createNamespacedPod(this.namespace, pod)
224+
.catch(this.#handleK8sError.bind(this));
225+
};
226+
227+
const createIdlePod = async (name: string, labels?: Record<string, string>) => {
228+
const container = {
229+
name,
230+
image: "docker.io/library/busybox",
231+
command: ["sh"],
232+
args: ["-c", "sleep infinity"],
233+
} satisfies k8s.V1Container;
234+
235+
await createPod(container, name, labels);
236+
};
237+
238+
const createCompletedPod = async (name: string, labels?: Record<string, string>) => {
239+
const container = {
240+
name,
241+
image: "docker.io/library/busybox",
242+
command: ["sh"],
243+
args: ["-c", "true"],
244+
} satisfies k8s.V1Container;
245+
246+
await createPod(container, name, labels);
247+
};
248+
249+
const createFailedPod = async (name: string, labels?: Record<string, string>) => {
250+
const container = {
251+
name,
252+
image: "docker.io/library/busybox",
253+
command: ["sh"],
254+
args: ["-c", "false"],
255+
} satisfies k8s.V1Container;
256+
257+
await createPod(container, name, labels);
258+
};
259+
260+
await createIdlePod("test-idle-1", { app: "task-run" });
261+
await createFailedPod("test-failed-1", { app: "task-run" });
262+
await createCompletedPod("test-completed-1", { app: "task-run" });
263+
}
264+
}

apps/kubernetes-provider/src/taskMonitor.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ type TaskMonitorOptions = {
3030

3131
export class TaskMonitor {
3232
#enabled = false;
33+
3334
#logger = new SimpleLogger("[TaskMonitor]");
3435
#taskInformer: ReturnType<typeof k8s.makeInformer<k8s.V1Pod>>;
3536
#processedPods = new Map<string, number>();
3637
#queue = new PQueue({ concurrency: 10 });
38+
3739
#k8sClient: {
3840
core: k8s.CoreV1Api;
3941
kubeConfig: k8s.KubeConfig;
@@ -44,6 +46,10 @@ export class TaskMonitor {
4446
private labelSelector = "app in (task-index, task-run)";
4547

4648
constructor(private opts: TaskMonitorOptions) {
49+
if (opts.namespace) {
50+
this.namespace = opts.namespace;
51+
}
52+
4753
this.#k8sClient = this.#createK8sClient();
4854

4955
this.#taskInformer = this.#createTaskInformer();

0 commit comments

Comments
 (0)