Skip to content

Added multiple env variables to kubernetes-provider #1305

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Sep 23, 2024
99 changes: 63 additions & 36 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,30 @@
import * as k8s from "@kubernetes/client-node";
import {
EnvironmentType,
MachinePreset,
PostStartCauses,
PreStopCauses,
} from "@trigger.dev/core/v3";
import {
ProviderShell,
SimpleLogger,
TaskOperations,
TaskOperationsCreateOptions,
TaskOperationsIndexOptions,
TaskOperationsPrePullDeploymentOptions,
TaskOperationsRestoreOptions,
} from "@trigger.dev/core/v3/apps";
import { SimpleLogger } from "@trigger.dev/core/v3/apps";
import {
MachinePreset,
PostStartCauses,
PreStopCauses,
EnvironmentType,
} from "@trigger.dev/core/v3";
import { TaskMonitor } from "./taskMonitor";
import { PodCleaner } from "./podCleaner";
import { TaskMonitor } from "./taskMonitor";
import { UptimeHeartbeat } from "./uptimeHeartbeat";

const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
const NODE_NAME = process.env.NODE_NAME || "local";
const OTEL_EXPORTER_OTLP_ENDPOINT =
process.env.OTEL_EXPORTER_OTLP_ENDPOINT ?? "http://0.0.0.0:4318";
const COORDINATOR_HOST = process.env.COORDINATOR_HOST ?? undefined;
const COORDINATOR_PORT = process.env.COORDINATOR_PORT ?? undefined;
const KUBERNETES_NAMESPACE = process.env.KUBERNETES_NAMESPACE ?? "default";

const POD_CLEANER_INTERVAL_SECONDS = Number(process.env.POD_CLEANER_INTERVAL_SECONDS || "300");

Expand All @@ -45,19 +48,22 @@ type ResourceQuantities = {
};

class KubernetesTaskOperations implements TaskOperations {
#namespace: Namespace;
#namespace: Namespace = {
metadata: {
name: "default",
},
};

#k8sApi: {
core: k8s.CoreV1Api;
batch: k8s.BatchV1Api;
apps: k8s.AppsV1Api;
};

constructor(namespace = "default") {
this.#namespace = {
metadata: {
name: namespace,
},
};
constructor(opts: { namespace?: string } = {}) {
if (opts.namespace) {
this.#namespace.metadata.name = opts.namespace;
}

this.#k8sApi = this.#createK8sApi();
}
Expand Down Expand Up @@ -229,16 +235,7 @@ class KubernetesTaskOperations implements TaskOperations {
imagePullPolicy: "IfNotPresent",
command: ["/bin/sh", "-c"],
args: ["printenv COORDINATOR_HOST | tee /etc/taskinfo/coordinator-host"],
env: [
{
name: "COORDINATOR_HOST",
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
},
],
env: this.#coordinatorEnvVars,
volumeMounts: [
{
name: "taskinfo",
Expand Down Expand Up @@ -409,6 +406,41 @@ class KubernetesTaskOperations implements TaskOperations {
};
}

get #coordinatorHostEnvVar(): k8s.V1EnvVar {
return COORDINATOR_HOST
? {
name: "COORDINATOR_HOST",
value: COORDINATOR_HOST,
}
: {
name: "COORDINATOR_HOST",
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
};
}

get #coordinatorPortEnvVar(): k8s.V1EnvVar | undefined {
if (COORDINATOR_PORT) {
return {
name: "COORDINATOR_PORT",
value: COORDINATOR_PORT,
};
}
}

get #coordinatorEnvVars(): k8s.V1EnvVar[] {
const envVars = [this.#coordinatorHostEnvVar];

if (this.#coordinatorPortEnvVar) {
envVars.push(this.#coordinatorPortEnvVar);
}

return envVars;
}

#getSharedEnv(envId: string): k8s.V1EnvVar[] {
return [
{
Expand All @@ -435,14 +467,6 @@ class KubernetesTaskOperations implements TaskOperations {
},
},
},
{
name: "COORDINATOR_HOST",
valueFrom: {
fieldRef: {
fieldPath: "status.hostIP",
},
},
},
{
name: "MACHINE_NAME",
valueFrom: {
Expand All @@ -451,6 +475,7 @@ class KubernetesTaskOperations implements TaskOperations {
},
},
},
...this.#coordinatorEnvVars,
];
}

Expand Down Expand Up @@ -623,7 +648,9 @@ class KubernetesTaskOperations implements TaskOperations {
}

const provider = new ProviderShell({
tasks: new KubernetesTaskOperations(),
tasks: new KubernetesTaskOperations({
namespace: KUBERNETES_NAMESPACE,
}),
type: "kubernetes",
});

Expand Down Expand Up @@ -663,7 +690,7 @@ taskMonitor.start();

const podCleaner = new PodCleaner({
runtimeEnv: RUNTIME_ENV,
namespace: "default",
namespace: KUBERNETES_NAMESPACE,
intervalInSeconds: POD_CLEANER_INTERVAL_SECONDS,
});

Expand All @@ -672,7 +699,7 @@ podCleaner.start();
if (UPTIME_HEARTBEAT_URL) {
const uptimeHeartbeat = new UptimeHeartbeat({
runtimeEnv: RUNTIME_ENV,
namespace: "default",
namespace: KUBERNETES_NAMESPACE,
intervalInSeconds: UPTIME_INTERVAL_SECONDS,
pingUrl: UPTIME_HEARTBEAT_URL,
maxPendingRuns: UPTIME_MAX_PENDING_RUNS,
Expand Down