Skip to content

v3: pre-pull deployments for faster startups from the first run #1236

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/brown-boats-bathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core-apps": patch
"@trigger.dev/core": patch
---

Pre-pull deployment images for faster startups
91 changes: 90 additions & 1 deletion apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
TaskOperations,
TaskOperationsCreateOptions,
TaskOperationsIndexOptions,
TaskOperationsPrePullDeploymentOptions,
TaskOperationsRestoreOptions,
} from "@trigger.dev/core-apps/provider";
import { SimpleLogger } from "@trigger.dev/core-apps/logger";
Expand Down Expand Up @@ -49,6 +50,7 @@ class KubernetesTaskOperations implements TaskOperations {
#k8sApi: {
core: k8s.CoreV1Api;
batch: k8s.BatchV1Api;
apps: k8s.AppsV1Api;
};

constructor(namespace = "default") {
Expand Down Expand Up @@ -313,6 +315,72 @@ class KubernetesTaskOperations implements TaskOperations {
await this.#getPod(opts.runId, this.#namespace);
}

async prePullDeployment(opts: TaskOperationsPrePullDeploymentOptions) {
const metaName = this.#getPrePullContainerName(opts.shortCode);

const metaLabels = {
...this.#getSharedLabels(opts),
app: "task-prepull",
"app.kubernetes.io/part-of": "trigger-worker",
"app.kubernetes.io/component": "prepull",
deployment: opts.deploymentId,
name: metaName,
} satisfies k8s.V1ObjectMeta["labels"];

await this.#createDaemonSet(
{
metadata: {
name: metaName,
namespace: this.#namespace.metadata.name,
labels: metaLabels,
},
spec: {
selector: {
matchLabels: {
name: metaName,
},
},
template: {
metadata: {
labels: metaLabels,
},
spec: {
...this.#defaultPodSpec,
restartPolicy: "Always",
initContainers: [
{
name: "prepull",
image: opts.imageRef,
command: ["/usr/bin/true"],
resources: {
limits: {
cpu: "0.25",
memory: "100Mi",
"ephemeral-storage": "1Gi",
},
},
},
],
containers: [
{
name: "pause",
image: "registry.k8s.io/pause:3.9",
resources: {
limits: {
cpu: "1m",
memory: "12Mi",
},
},
},
],
},
},
},
},
this.#namespace
);
}

#envTypeToLabelValue(type: EnvironmentType) {
switch (type) {
case "PRODUCTION":
Expand Down Expand Up @@ -402,7 +470,11 @@ class KubernetesTaskOperations implements TaskOperations {
}

#getSharedLabels(
opts: TaskOperationsIndexOptions | TaskOperationsCreateOptions | TaskOperationsRestoreOptions
opts:
| TaskOperationsIndexOptions
| TaskOperationsCreateOptions
| TaskOperationsRestoreOptions
| TaskOperationsPrePullDeploymentOptions
): Record<string, string> {
return {
env: opts.envId,
Expand Down Expand Up @@ -446,6 +518,10 @@ class KubernetesTaskOperations implements TaskOperations {
return `task-run-${suffix}`;
}

#getPrePullContainerName(suffix: string) {
return `task-prepull-${suffix}`;
}

#createK8sApi() {
const kubeConfig = new k8s.KubeConfig();

Expand All @@ -460,6 +536,7 @@ class KubernetesTaskOperations implements TaskOperations {
return {
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
batch: kubeConfig.makeApiClient(k8s.BatchV1Api),
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
};
}

Expand Down Expand Up @@ -503,6 +580,18 @@ class KubernetesTaskOperations implements TaskOperations {
}
}

async #createDaemonSet(daemonSet: k8s.V1DaemonSet, namespace: Namespace) {
try {
const res = await this.#k8sApi.apps.createNamespacedDaemonSet(
namespace.metadata.name,
daemonSet
);
logger.debug(res.body);
} catch (err: unknown) {
this.#handleK8sError(err);
}
}

#throwUnlessRecord(candidate: unknown): asserts candidate is Record<string, unknown> {
if (typeof candidate !== "object" || candidate === null) {
throw candidate;
Expand Down
59 changes: 59 additions & 0 deletions apps/kubernetes-provider/src/podCleaner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class PodCleaner {
private logger = new SimpleLogger("[PodCleaner]");
private k8sClient: {
core: k8s.CoreV1Api;
apps: k8s.AppsV1Api;
kubeConfig: k8s.KubeConfig;
};

Expand Down Expand Up @@ -43,6 +44,7 @@ export class PodCleaner {

return {
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
kubeConfig: kubeConfig,
};
}
Expand Down Expand Up @@ -98,6 +100,25 @@ export class PodCleaner {
.catch(this.#handleK8sError.bind(this));
}

async #deleteDaemonSets(opts: {
namespace: string;
dryRun?: boolean;
fieldSelector?: string;
labelSelector?: string;
}) {
return await this.k8sClient.apps
.deleteCollectionNamespacedDaemonSet(
opts.namespace,
undefined, // pretty
undefined, // continue
opts.dryRun ? "All" : undefined,
opts.fieldSelector,
undefined, // gracePeriodSeconds
opts.labelSelector
)
.catch(this.#handleK8sError.bind(this));
}

async #deleteCompletedRuns() {
this.logger.log("Deleting completed runs");

Expand Down Expand Up @@ -152,6 +173,28 @@ export class PodCleaner {
});
}

async #deleteCompletedPrePulls() {
this.logger.log("Deleting completed pre-pulls");

const start = Date.now();

const result = await this.#deleteDaemonSets({
namespace: this.namespace,
labelSelector: "app=task-prepull",
});

const elapsedMs = Date.now() - start;

if (!result) {
this.logger.log("Deleting completed pre-pulls: No delete result", { elapsedMs });
return;
}

const total = (result.response as any)?.body?.items?.length ?? 0;

this.logger.log("Deleting completed pre-pulls: Done", { total, elapsedMs });
}

async start() {
this.enabled = true;
this.logger.log("Starting");
Expand Down Expand Up @@ -186,6 +229,22 @@ export class PodCleaner {
2 * this.intervalInSeconds * 1000
);

const completedPrePullInterval = setInterval(
async () => {
if (!this.enabled) {
clearInterval(completedPrePullInterval);
return;
}

try {
await this.#deleteCompletedPrePulls();
} catch (error) {
this.logger.error("Error deleting completed pre-pulls", error);
}
},
2 * this.intervalInSeconds * 1000
);

// this.#launchTests();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { logger } from "~/services/logger.server";
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server";
import { TimeoutDeploymentService } from "./timeoutDeployment.server";
import { socketIo } from "../handleSocketIo.server";

export class CreateDeployedBackgroundWorkerService extends BaseService {
public async call(
Expand Down Expand Up @@ -132,6 +133,20 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
logger.error("Failed to publish WORKER_CREATED event", { err });
}

if (deployment.imageReference) {
socketIo.providerNamespace.emit("PRE_PULL_DEPLOYMENT", {
version: "v1",
imageRef: deployment.imageReference,
shortCode: deployment.shortCode,
// identifiers
deploymentId: deployment.id,
envId: environment.id,
envType: environment.type,
orgId: environment.organizationId,
projectId: deployment.projectId,
});
}

await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma);
await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma);
await TimeoutDeploymentService.dequeue(deployment.id, this._prisma);
Expand Down
47 changes: 42 additions & 5 deletions packages/core-apps/src/provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ export interface TaskOperationsRestoreOptions {
checkpointId: string;
}

export interface TaskOperationsPrePullDeploymentOptions {
shortCode: string;
imageRef: string;
// identifiers
envId: string;
envType: EnvironmentType;
orgId: string;
projectId: string;
deploymentId: string;
}

export interface TaskOperations {
init: () => Promise<any>;

Expand All @@ -73,8 +84,10 @@ export interface TaskOperations {
restore: (opts: TaskOperationsRestoreOptions) => Promise<any>;

// unimplemented
delete: (...args: any[]) => Promise<any>;
get: (...args: any[]) => Promise<any>;
delete?: (...args: any[]) => Promise<any>;
get?: (...args: any[]) => Promise<any>;

prePullDeployment?: (opts: TaskOperationsPrePullDeploymentOptions) => Promise<any>;
}

type ProviderShellOptions = {
Expand Down Expand Up @@ -277,6 +290,27 @@ export class ProviderShell implements Provider {
logger.error("restore failed", error);
}
},
PRE_PULL_DEPLOYMENT: async (message) => {
if (!this.tasks.prePullDeployment) {
logger.debug("prePullDeployment not implemented", message);
return;
}

try {
await this.tasks.prePullDeployment({
shortCode: message.shortCode,
imageRef: message.imageRef,
// identifiers
envId: message.envId,
envType: message.envType,
orgId: message.orgId,
projectId: message.projectId,
deploymentId: message.deploymentId,
});
} catch (error) {
logger.error("prePullDeployment failed", error);
}
},
},
});

Expand Down Expand Up @@ -306,9 +340,12 @@ export class ProviderShell implements Provider {
case "/delete": {
const body = await getTextBody(req);

await this.tasks.delete({ runId: body });

return reply.text(`sent delete request: ${body}`);
if (this.tasks.delete) {
await this.tasks.delete({ runId: body });
return reply.text(`sent delete request: ${body}`);
} else {
return reply.text("delete not implemented", 501);
}
}
default: {
return reply.empty(404);
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/v3/schemas/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,19 @@ export const PlatformToProviderMessages = {
runId: z.string(),
}),
},
PRE_PULL_DEPLOYMENT: {
message: z.object({
version: z.literal("v1").default("v1"),
imageRef: z.string(),
shortCode: z.string(),
// identifiers
envId: z.string(),
envType: EnvironmentType,
orgId: z.string(),
projectId: z.string(),
deploymentId: z.string(),
}),
},
};

const CreateWorkerMessage = z.object({
Expand Down
Loading