Skip to content

Add supervisor split service control #1774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions apps/supervisor/src/env.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,41 @@
import { randomUUID } from "crypto";
import { env as stdEnv } from "std-env";
import { z } from "zod";
import { getDockerHostDomain } from "./util.js";

const BoolEnv = z.preprocess((val) => {
if (typeof val !== "string") {
return val;
}

return ["true", "1"].includes(val.toLowerCase().trim());
}, z.boolean());

const Env = z.object({
// This will come from `status.hostIP` in k8s
WORKER_HOST_IP: z.string().default(getDockerHostDomain()),
TRIGGER_API_URL: z.string().url(),
TRIGGER_WORKER_TOKEN: z.string(),
// This will come from `spec.nodeName` in k8s
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),

// Required settings
TRIGGER_API_URL: z.string().url(),
TRIGGER_WORKER_TOKEN: z.string(),
MANAGED_WORKER_SECRET: z.string(),
TRIGGER_WORKLOAD_API_PORT: z.coerce.number().default(8020),
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020),

// Workload API settings (coordinator mode) - the workload API is what the run controller connects to
TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default("true"),
TRIGGER_WORKLOAD_API_PROTOCOL: z
.string()
.transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase()))
.default("http"),
TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default
TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller

// Dequeue settings (provider mode)
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),

// Optional services
TRIGGER_WARM_START_URL: z.string().optional(),
TRIGGER_CHECKPOINT_URL: z.string().optional(),
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),

// Used by the workload manager, e.g docker/k8s
DOCKER_NETWORK: z.string().default("host"),
Expand Down
28 changes: 23 additions & 5 deletions apps/supervisor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class ManagedSupervisor {
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;

constructor() {
const workerApiUrl = `http://${env.WORKER_HOST_IP}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}`;
const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL;
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;

if (this.warmStartUrl) {
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
Expand All @@ -40,13 +42,17 @@ class ManagedSupervisor {
if (this.isKubernetes) {
this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
this.workloadManager = new KubernetesWorkloadManager({
workerApiUrl,
workloadApiProtocol,
workloadApiDomain,
workloadApiPort: workloadApiPortExternal,
warmStartUrl: this.warmStartUrl,
});
} else {
this.resourceMonitor = new DockerResourceMonitor(new Docker());
this.workloadManager = new DockerWorkloadManager({
workerApiUrl,
workloadApiProtocol,
workloadApiDomain,
workloadApiPort: workloadApiPortExternal,
warmStartUrl: this.warmStartUrl,
});
}
Expand All @@ -57,6 +63,8 @@ class ManagedSupervisor {
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
preDequeue: async () => {
if (this.isKubernetes) {
// TODO: Test k8s resource monitor and remove this
Expand Down Expand Up @@ -180,7 +188,7 @@ class ManagedSupervisor {

// Responds to workload requests only
this.workloadServer = new WorkloadServer({
port: env.TRIGGER_WORKLOAD_API_PORT,
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
workerClient: this.workerSession.httpClient,
checkpointClient: this.checkpointClient,
});
Expand Down Expand Up @@ -238,7 +246,17 @@ class ManagedSupervisor {
async start() {
this.logger.log("[ManagedWorker] Starting up");

await this.workloadServer.start();
if (env.TRIGGER_WORKLOAD_API_ENABLED) {
this.logger.log("[ManagedWorker] Workload API enabled", {
protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
domain: env.TRIGGER_WORKLOAD_API_DOMAIN,
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
});
await this.workloadServer.start();
} else {
this.logger.warn("[ManagedWorker] Workload API disabled");
}

await this.workerSession.start();

await this.httpServer.start();
Expand Down
14 changes: 11 additions & 3 deletions apps/supervisor/src/workloadManager/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ import {
} from "./types.js";
import { x } from "tinyexec";
import { env } from "../env.js";
import { RunnerId } from "../util.js";
import { getDockerHostDomain, RunnerId } from "../util.js";

export class DockerWorkloadManager implements WorkloadManager {
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");

constructor(private opts: WorkloadManagerOptions) {}
constructor(private opts: WorkloadManagerOptions) {
if (opts.workloadApiDomain) {
this.logger.warn("[DockerWorkloadProvider] ⚠️ Custom workload API domain", {
domain: opts.workloadApiDomain,
});
}
}

async create(opts: WorkloadManagerCreateOptions) {
this.logger.log("[DockerWorkloadProvider] Creating container", { opts });
Expand All @@ -24,7 +30,9 @@ export class DockerWorkloadManager implements WorkloadManager {
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`,
`--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`,
`--env=TRIGGER_WORKER_API_URL=${this.opts.workerApiUrl}`,
`--env=TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`,
`--env=TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`,
`--env=TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`,
`--env=TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`,
`--env=OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`,
`--env=TRIGGER_RUNNER_ID=${runnerId}`,
Expand Down
28 changes: 26 additions & 2 deletions apps/supervisor/src/workloadManager/kubernetes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ export class KubernetesWorkloadManager implements WorkloadManager {

constructor(private opts: WorkloadManagerOptions) {
this.k8s = createK8sApi();

if (opts.workloadApiDomain) {
this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", {
domain: opts.workloadApiDomain,
});
}
}

async create(opts: WorkloadManagerCreateOptions) {
Expand Down Expand Up @@ -72,8 +78,26 @@ export class KubernetesWorkloadManager implements WorkloadManager {
value: opts.snapshotFriendlyId,
},
{
name: "TRIGGER_WORKER_API_URL",
value: this.opts.workerApiUrl,
name: "TRIGGER_SUPERVISOR_API_PROTOCOL",
value: this.opts.workloadApiProtocol,
},
{
name: "TRIGGER_SUPERVISOR_API_PORT",
value: `${this.opts.workloadApiPort}`,
},
{
name: "TRIGGER_SUPERVISOR_API_DOMAIN",
...(this.opts.workloadApiDomain
? {
value: this.opts.workloadApiDomain,
}
: {
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
}),
},
{
name: "TRIGGER_WORKER_INSTANCE_NAME",
Expand Down
4 changes: 3 additions & 1 deletion apps/supervisor/src/workloadManager/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { type EnvironmentType, type MachinePreset } from "@trigger.dev/core/v3";

export interface WorkloadManagerOptions {
workerApiUrl: string;
workloadApiProtocol: "http" | "https";
workloadApiDomain?: string; // If unset, will use orchestrator-specific default
workloadApiPort: number;
warmStartUrl?: string;
}

Expand Down
13 changes: 9 additions & 4 deletions packages/cli-v3/src/entryPoints/managed-run-controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ const Env = z.object({
NODE_EXTRA_CA_CERTS: z.string().optional(),

// Set at runtime
TRIGGER_WORKER_API_URL: z.string().url(),
TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]),
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(),
TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`),
TRIGGER_ENV_ID: z.string(),
TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts
Expand Down Expand Up @@ -84,6 +86,8 @@ class ManagedRunController {
private readonly snapshotPoller: HeartbeatService;
private readonly snapshotPollIntervalSeconds: number;

private readonly workerApiUrl: string;

private state:
| {
phase: "RUN";
Expand Down Expand Up @@ -246,8 +250,10 @@ class ManagedRunController {
this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30;
this.snapshotPollIntervalSeconds = 5;

this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`;

this.httpClient = new WorkloadHttpClient({
workerApiUrl: env.TRIGGER_WORKER_API_URL,
workerApiUrl: this.workerApiUrl,
deploymentId: env.TRIGGER_DEPLOYMENT_ID,
runnerId: env.TRIGGER_RUNNER_ID,
});
Expand Down Expand Up @@ -746,8 +752,7 @@ class ManagedRunController {
}

createSocket() {
const wsUrl = new URL(env.TRIGGER_WORKER_API_URL);
wsUrl.pathname = "/workload";
const wsUrl = new URL("/workload", this.workerApiUrl);

this.socket = io(wsUrl.href, {
transports: ["websocket"],
Expand Down
51 changes: 36 additions & 15 deletions packages/core/src/v3/runEngineWorker/supervisor/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import { getDefaultWorkerHeaders } from "./util.js";
import { HeartbeatService } from "../../utils/heartbeat.js";

type SupervisorSessionOptions = SupervisorClientCommonOptions & {
queueConsumerEnabled?: boolean;
runNotificationsEnabled?: boolean;
heartbeatIntervalSeconds?: number;
dequeueIntervalMs?: number;
preDequeue?: PreDequeueFn;
Expand All @@ -20,15 +22,21 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
export class SupervisorSession extends EventEmitter<WorkerEvents> {
public readonly httpClient: SupervisorHttpClient;

private socket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
private readonly runNotificationsEnabled: boolean;
private runNotificationsSocket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;

private readonly queueConsumerEnabled: boolean;
private readonly queueConsumer: RunQueueConsumer;

private readonly heartbeatService: HeartbeatService;
private readonly heartbeatIntervalSeconds: number;

constructor(private opts: SupervisorSessionOptions) {
super();

this.runNotificationsEnabled = opts.runNotificationsEnabled ?? true;
this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true;

this.httpClient = new SupervisorHttpClient(opts);
this.queueConsumer = new RunQueueConsumer({
client: this.httpClient,
Expand Down Expand Up @@ -76,12 +84,12 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
subscribeToRunNotifications(runFriendlyIds: string[]) {
console.log("[SupervisorSession] Subscribing to run notifications", { runFriendlyIds });

if (!this.socket) {
if (!this.runNotificationsSocket) {
console.error("[SupervisorSession] Socket not connected");
return;
}

this.socket.emit("run:subscribe", { version: "1", runFriendlyIds });
this.runNotificationsSocket.emit("run:subscribe", { version: "1", runFriendlyIds });

Promise.allSettled(
runFriendlyIds.map((runFriendlyId) =>
Expand All @@ -96,12 +104,12 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
unsubscribeFromRunNotifications(runFriendlyIds: string[]) {
console.log("[SupervisorSession] Unsubscribing from run notifications", { runFriendlyIds });

if (!this.socket) {
if (!this.runNotificationsSocket) {
console.error("[SupervisorSession] Socket not connected");
return;
}

this.socket.emit("run:unsubscribe", { version: "1", runFriendlyIds });
this.runNotificationsSocket.emit("run:unsubscribe", { version: "1", runFriendlyIds });

Promise.allSettled(
runFriendlyIds.map((runFriendlyId) =>
Expand All @@ -116,15 +124,15 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
);
}

private createSocket() {
private createRunNotificationsSocket() {
const wsUrl = new URL(this.opts.apiUrl);
wsUrl.pathname = "/worker";

this.socket = io(wsUrl.href, {
const socket = io(wsUrl.href, {
transports: ["websocket"],
extraHeaders: getDefaultWorkerHeaders(this.opts),
});
this.socket.on("run:notify", ({ version, run }) => {
socket.on("run:notify", ({ version, run }) => {
console.log("[SupervisorSession][WS] Received run notification", { version, run });
this.emit("runNotification", { time: new Date(), run });

Expand All @@ -137,15 +145,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
console.error("[SupervisorSession] Failed to send debug log", { error });
});
});
this.socket.on("connect", () => {
socket.on("connect", () => {
console.log("[SupervisorSession][WS] Connected to platform");
});
this.socket.on("connect_error", (error) => {
socket.on("connect_error", (error) => {
console.error("[SupervisorSession][WS] Connection error", { error });
});
this.socket.on("disconnect", (reason, description) => {
socket.on("disconnect", (reason, description) => {
console.log("[SupervisorSession][WS] Disconnected from platform", { reason, description });
});

return socket;
}

async start() {
Expand All @@ -167,14 +177,25 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
name: workerGroup.name,
});

this.queueConsumer.start();
this.heartbeatService.start();
this.createSocket();
if (this.queueConsumerEnabled) {
console.log("[SupervisorSession] Queue consumer enabled");
this.queueConsumer.start();
this.heartbeatService.start();
} else {
console.warn("[SupervisorSession] Queue consumer disabled");
}

if (this.runNotificationsEnabled) {
console.log("[SupervisorSession] Run notifications enabled");
this.runNotificationsSocket = this.createRunNotificationsSocket();
} else {
console.warn("[SupervisorSession] Run notifications disabled");
}
}

async stop() {
this.heartbeatService.stop();
this.socket?.disconnect();
this.runNotificationsSocket?.disconnect();
}

private getHeartbeatBody(): WorkerApiHeartbeatRequestBody {
Expand Down
Loading