Skip to content

Commit 1e667ec

Browse files
authored
v3: cluster uptime heartbeat (#1194)
* add basic uptime heartbeat * add more heartbeat metrics
1 parent 7e97dcb commit 1e667ec

File tree

2 files changed

+296
-0
lines changed

2 files changed

+296
-0
lines changed

apps/kubernetes-provider/src/index.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,21 @@ import {
1616
import { randomUUID } from "crypto";
1717
import { TaskMonitor } from "./taskMonitor";
1818
import { PodCleaner } from "./podCleaner";
19+
import { UptimeHeartbeat } from "./uptimeHeartbeat";
1920

2021
const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
2122
const NODE_NAME = process.env.NODE_NAME || "local";
2223
const OTEL_EXPORTER_OTLP_ENDPOINT =
2324
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318";
25+
2426
const POD_CLEANER_INTERVAL_SECONDS = Number(process.env.POD_CLEANER_INTERVAL_SECONDS || "300");
2527

28+
const UPTIME_HEARTBEAT_URL = process.env.UPTIME_HEARTBEAT_URL;
29+
const UPTIME_INTERVAL_SECONDS = Number(process.env.UPTIME_INTERVAL_SECONDS || "60");
30+
const UPTIME_MAX_PENDING_RUNS = Number(process.env.UPTIME_MAX_PENDING_RUNS || "25");
31+
const UPTIME_MAX_PENDING_INDECES = Number(process.env.UPTIME_MAX_PENDING_INDECES || "10");
32+
const UPTIME_MAX_PENDING_ERRORS = Number(process.env.UPTIME_MAX_PENDING_ERRORS || "10");
33+
2634
const logger = new SimpleLogger(`[${NODE_NAME}]`);
2735
logger.log(`running in ${RUNTIME_ENV} mode`);
2836

@@ -566,3 +574,19 @@ const podCleaner = new PodCleaner({
566574
});
567575

568576
podCleaner.start();
577+
578+
if (UPTIME_HEARTBEAT_URL) {
579+
const uptimeHeartbeat = new UptimeHeartbeat({
580+
runtimeEnv: RUNTIME_ENV,
581+
namespace: "default",
582+
intervalInSeconds: UPTIME_INTERVAL_SECONDS,
583+
pingUrl: UPTIME_HEARTBEAT_URL,
584+
maxPendingRuns: UPTIME_MAX_PENDING_RUNS,
585+
maxPendingIndeces: UPTIME_MAX_PENDING_INDECES,
586+
maxPendingErrors: UPTIME_MAX_PENDING_ERRORS,
587+
});
588+
589+
uptimeHeartbeat.start();
590+
} else {
591+
logger.log("Uptime heartbeat is disabled, set UPTIME_HEARTBEAT_URL to enable.");
592+
}
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
import * as k8s from "@kubernetes/client-node";
2+
import { SimpleLogger } from "@trigger.dev/core-apps";
3+
4+
type UptimeHeartbeatOptions = {
5+
runtimeEnv: "local" | "kubernetes";
6+
pingUrl: string;
7+
namespace?: string;
8+
intervalInSeconds?: number;
9+
maxPendingRuns?: number;
10+
maxPendingIndeces?: number;
11+
maxPendingErrors?: number;
12+
leadingEdge?: boolean;
13+
};
14+
15+
export class UptimeHeartbeat {
16+
private enabled = false;
17+
private namespace: string;
18+
19+
private intervalInSeconds: number;
20+
private maxPendingRuns: number;
21+
private maxPendingIndeces: number;
22+
private maxPendingErrors: number;
23+
24+
private leadingEdge = true;
25+
26+
private logger = new SimpleLogger("[UptimeHeartbeat]");
27+
private k8sClient: {
28+
core: k8s.CoreV1Api;
29+
kubeConfig: k8s.KubeConfig;
30+
};
31+
32+
constructor(private opts: UptimeHeartbeatOptions) {
33+
this.namespace = opts.namespace ?? "default";
34+
35+
this.intervalInSeconds = opts.intervalInSeconds ?? 60;
36+
this.maxPendingRuns = opts.maxPendingRuns ?? 25;
37+
this.maxPendingIndeces = opts.maxPendingIndeces ?? 10;
38+
this.maxPendingErrors = opts.maxPendingErrors ?? 10;
39+
40+
this.k8sClient = this.#createK8sClient();
41+
}
42+
43+
#createK8sClient() {
44+
const kubeConfig = new k8s.KubeConfig();
45+
46+
if (this.opts.runtimeEnv === "local") {
47+
kubeConfig.loadFromDefault();
48+
} else if (this.opts.runtimeEnv === "kubernetes") {
49+
kubeConfig.loadFromCluster();
50+
} else {
51+
throw new Error(`Unsupported runtime environment: ${this.opts.runtimeEnv}`);
52+
}
53+
54+
return {
55+
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
56+
kubeConfig: kubeConfig,
57+
};
58+
}
59+
60+
#isRecord(candidate: unknown): candidate is Record<string, unknown> {
61+
if (typeof candidate !== "object" || candidate === null) {
62+
return false;
63+
} else {
64+
return true;
65+
}
66+
}
67+
68+
#logK8sError(err: unknown, debugOnly = false) {
69+
if (debugOnly) {
70+
this.logger.debug("K8s API Error", err);
71+
} else {
72+
this.logger.error("K8s API Error", err);
73+
}
74+
}
75+
76+
#handleK8sError(err: unknown) {
77+
if (!this.#isRecord(err) || !this.#isRecord(err.body)) {
78+
this.#logK8sError(err);
79+
return;
80+
}
81+
82+
this.#logK8sError(err, true);
83+
84+
if (typeof err.body.message === "string") {
85+
this.#logK8sError({ message: err.body.message });
86+
return;
87+
}
88+
89+
this.#logK8sError({ body: err.body });
90+
}
91+
92+
async #getPods(opts: {
93+
namespace: string;
94+
fieldSelector?: string;
95+
labelSelector?: string;
96+
}): Promise<Array<k8s.V1Pod> | undefined> {
97+
const listReturn = await this.k8sClient.core
98+
.listNamespacedPod(
99+
opts.namespace,
100+
undefined, // pretty
101+
undefined, // allowWatchBookmarks
102+
undefined, // _continue
103+
opts.fieldSelector,
104+
opts.labelSelector,
105+
this.maxPendingRuns * 2, // limit
106+
undefined, // resourceVersion
107+
undefined, // resourceVersionMatch
108+
undefined, // sendInitialEvents
109+
this.intervalInSeconds, // timeoutSeconds,
110+
undefined // watch
111+
)
112+
.catch(this.#handleK8sError.bind(this));
113+
114+
return listReturn?.body.items;
115+
}
116+
117+
async #getPendingIndeces(): Promise<Array<k8s.V1Pod> | undefined> {
118+
return await this.#getPods({
119+
namespace: this.namespace,
120+
fieldSelector: "status.phase=Pending",
121+
labelSelector: "app=task-index",
122+
});
123+
}
124+
125+
async #getPendingTasks(): Promise<Array<k8s.V1Pod> | undefined> {
126+
return await this.#getPods({
127+
namespace: this.namespace,
128+
fieldSelector: "status.phase=Pending",
129+
labelSelector: "app=task-run",
130+
});
131+
}
132+
133+
#countPods(pods: Array<k8s.V1Pod>): number {
134+
return pods.length;
135+
}
136+
137+
#filterPendingPods(
138+
pods: Array<k8s.V1Pod>,
139+
waitingReason: "CreateContainerError" | "RunContainerError"
140+
): Array<k8s.V1Pod> {
141+
return pods.filter((pod) => {
142+
const containerStatus = pod.status?.containerStatuses?.[0];
143+
return containerStatus?.state?.waiting?.reason === waitingReason;
144+
});
145+
}
146+
147+
async #sendPing() {
148+
this.logger.log("Sending ping");
149+
150+
const start = Date.now();
151+
const controller = new AbortController();
152+
153+
const timeoutMs = (this.intervalInSeconds * 1000) / 2;
154+
155+
const fetchTimeout = setTimeout(() => {
156+
controller.abort();
157+
}, timeoutMs);
158+
159+
try {
160+
const response = await fetch(this.opts.pingUrl, {
161+
signal: controller.signal,
162+
});
163+
164+
if (!response.ok) {
165+
this.logger.error("Failed to send ping, response not OK", {
166+
status: response.status,
167+
});
168+
return;
169+
}
170+
171+
const elapsedMs = Date.now() - start;
172+
this.logger.log("Ping sent", { elapsedMs });
173+
} catch (error) {
174+
if (error instanceof DOMException && error.name === "AbortError") {
175+
this.logger.log("Ping timeout", { timeoutSeconds: timeoutMs });
176+
return;
177+
}
178+
179+
this.logger.error("Failed to send ping", error);
180+
} finally {
181+
clearTimeout(fetchTimeout);
182+
}
183+
}
184+
185+
async #heartbeat() {
186+
this.logger.log("Performing heartbeat");
187+
188+
const start = Date.now();
189+
190+
const pendingTasks = await this.#getPendingTasks();
191+
192+
if (!pendingTasks) {
193+
this.logger.error("Failed to get pending tasks");
194+
return;
195+
}
196+
197+
const totalPendingTasks = this.#countPods(pendingTasks);
198+
199+
const pendingIndeces = await this.#getPendingIndeces();
200+
201+
if (!pendingIndeces) {
202+
this.logger.error("Failed to get pending indeces");
203+
return;
204+
}
205+
206+
const totalPendingIndeces = this.#countPods(pendingIndeces);
207+
208+
const elapsedMs = Date.now() - start;
209+
210+
this.logger.log("Finished heartbeat checks", { elapsedMs });
211+
212+
if (totalPendingTasks > this.maxPendingRuns) {
213+
this.logger.log("Too many pending tasks, skipping heartbeat", { totalPendingTasks });
214+
return;
215+
}
216+
217+
if (totalPendingIndeces > this.maxPendingIndeces) {
218+
this.logger.log("Too many pending indeces, skipping heartbeat", { totalPendingIndeces });
219+
return;
220+
}
221+
222+
const totalCreateContainerErrors = this.#countPods(
223+
this.#filterPendingPods(pendingTasks, "CreateContainerError")
224+
);
225+
const totalRunContainerErrors = this.#countPods(
226+
this.#filterPendingPods(pendingTasks, "RunContainerError")
227+
);
228+
229+
if (totalCreateContainerErrors + totalRunContainerErrors > this.maxPendingErrors) {
230+
this.logger.log("Too many pending tasks with errors, skipping heartbeat", {
231+
totalRunContainerErrors,
232+
totalCreateContainerErrors,
233+
});
234+
return;
235+
}
236+
237+
await this.#sendPing();
238+
239+
this.logger.log("Heartbeat done", { totalPendingTasks, elapsedMs });
240+
}
241+
242+
async start() {
243+
this.enabled = true;
244+
this.logger.log("Starting");
245+
246+
if (this.leadingEdge) {
247+
await this.#heartbeat();
248+
}
249+
250+
const heartbeat = setInterval(async () => {
251+
if (!this.enabled) {
252+
clearInterval(heartbeat);
253+
return;
254+
}
255+
256+
try {
257+
await this.#heartbeat();
258+
} catch (error) {
259+
this.logger.error("Error while heartbeating", error);
260+
}
261+
}, this.intervalInSeconds * 1000);
262+
}
263+
264+
async stop() {
265+
if (!this.enabled) {
266+
return;
267+
}
268+
269+
this.enabled = false;
270+
this.logger.log("Shutting down..");
271+
}
272+
}

0 commit comments

Comments
 (0)