Skip to content

Commit a82e323

Browse files
committed
pre-pull deployed images on all workers
1 parent e27d5cd commit a82e323

File tree

4 files changed

+160
-6
lines changed

4 files changed

+160
-6
lines changed

apps/kubernetes-provider/src/index.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import {
44
TaskOperations,
55
TaskOperationsCreateOptions,
66
TaskOperationsIndexOptions,
7+
TaskOperationsPrePullImageOptions,
78
TaskOperationsRestoreOptions,
89
} from "@trigger.dev/core-apps/provider";
910
import { SimpleLogger } from "@trigger.dev/core-apps/logger";
@@ -49,6 +50,7 @@ class KubernetesTaskOperations implements TaskOperations {
4950
#k8sApi: {
5051
core: k8s.CoreV1Api;
5152
batch: k8s.BatchV1Api;
53+
apps: k8s.AppsV1Api;
5254
};
5355

5456
constructor(namespace = "default") {
@@ -313,6 +315,72 @@ class KubernetesTaskOperations implements TaskOperations {
313315
await this.#getPod(opts.runId, this.#namespace);
314316
}
315317

318+
async prePullImage(opts: TaskOperationsPrePullImageOptions) {
319+
const metaName = this.#getPrePullContainerName(opts.shortCode);
320+
321+
const metaLabels = {
322+
...this.#getSharedLabels(opts),
323+
app: "task-prepull",
324+
"app.kubernetes.io/part-of": "trigger-worker",
325+
"app.kubernetes.io/component": "prepull",
326+
deployment: opts.deploymentId,
327+
name: metaName,
328+
} satisfies k8s.V1ObjectMeta["labels"];
329+
330+
await this.#createDaemonSet(
331+
{
332+
metadata: {
333+
name: metaName,
334+
namespace: this.#namespace.metadata.name,
335+
labels: metaLabels,
336+
},
337+
spec: {
338+
selector: {
339+
matchLabels: {
340+
name: metaName,
341+
},
342+
},
343+
template: {
344+
metadata: {
345+
labels: metaLabels,
346+
},
347+
spec: {
348+
...this.#defaultPodSpec,
349+
restartPolicy: "Always",
350+
initContainers: [
351+
{
352+
name: "prepull",
353+
image: opts.imageRef,
354+
command: ["/usr/bin/true"],
355+
resources: {
356+
limits: {
357+
cpu: "0.25",
358+
memory: "100Mi",
359+
"ephemeral-storage": "1Gi",
360+
},
361+
},
362+
},
363+
],
364+
containers: [
365+
{
366+
name: "pause",
367+
image: "registry.k8s.io/pause:3.9",
368+
resources: {
369+
limits: {
370+
cpu: "1m",
371+
memory: "12Mi",
372+
},
373+
},
374+
},
375+
],
376+
},
377+
},
378+
},
379+
},
380+
this.#namespace
381+
);
382+
}
383+
316384
#envTypeToLabelValue(type: EnvironmentType) {
317385
switch (type) {
318386
case "PRODUCTION":
@@ -402,7 +470,11 @@ class KubernetesTaskOperations implements TaskOperations {
402470
}
403471

404472
#getSharedLabels(
405-
opts: TaskOperationsIndexOptions | TaskOperationsCreateOptions | TaskOperationsRestoreOptions
473+
opts:
474+
| TaskOperationsIndexOptions
475+
| TaskOperationsCreateOptions
476+
| TaskOperationsRestoreOptions
477+
| TaskOperationsPrePullImageOptions
406478
): Record<string, string> {
407479
return {
408480
env: opts.envId,
@@ -446,6 +518,10 @@ class KubernetesTaskOperations implements TaskOperations {
446518
return `task-run-${suffix}`;
447519
}
448520

521+
#getPrePullContainerName(suffix: string) {
522+
return `task-prepull-${suffix}`;
523+
}
524+
449525
#createK8sApi() {
450526
const kubeConfig = new k8s.KubeConfig();
451527

@@ -460,6 +536,7 @@ class KubernetesTaskOperations implements TaskOperations {
460536
return {
461537
core: kubeConfig.makeApiClient(k8s.CoreV1Api),
462538
batch: kubeConfig.makeApiClient(k8s.BatchV1Api),
539+
apps: kubeConfig.makeApiClient(k8s.AppsV1Api),
463540
};
464541
}
465542

@@ -503,6 +580,18 @@ class KubernetesTaskOperations implements TaskOperations {
503580
}
504581
}
505582

583+
async #createDaemonSet(daemonSet: k8s.V1DaemonSet, namespace: Namespace) {
584+
try {
585+
const res = await this.#k8sApi.apps.createNamespacedDaemonSet(
586+
namespace.metadata.name,
587+
daemonSet
588+
);
589+
logger.debug(res.body);
590+
} catch (err: unknown) {
591+
this.#handleK8sError(err);
592+
}
593+
}
594+
506595
#throwUnlessRecord(candidate: unknown): asserts candidate is Record<string, unknown> {
507596
if (typeof candidate !== "object" || candidate === null) {
508597
throw candidate;

apps/webapp/app/v3/services/createDeployedBackgroundWorker.server.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import { logger } from "~/services/logger.server";
1111
import { ExecuteTasksWaitingForDeployService } from "./executeTasksWaitingForDeploy";
1212
import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server";
1313
import { TimeoutDeploymentService } from "./timeoutDeployment.server";
14+
import { socketIo } from "../handleSocketIo.server";
1415

1516
export class CreateDeployedBackgroundWorkerService extends BaseService {
1617
public async call(
@@ -132,6 +133,20 @@ export class CreateDeployedBackgroundWorkerService extends BaseService {
132133
logger.error("Failed to publish WORKER_CREATED event", { err });
133134
}
134135

136+
if (deployment.imageReference) {
137+
socketIo.providerNamespace.emit("PRE_PULL_IMAGE", {
138+
version: "v1",
139+
imageRef: deployment.imageReference,
140+
shortCode: deployment.shortCode,
141+
// identifiers
142+
deploymentId: deployment.id,
143+
envId: environment.id,
144+
envType: environment.type,
145+
orgId: environment.organizationId,
146+
projectId: deployment.projectId,
147+
});
148+
}
149+
135150
await ExecuteTasksWaitingForDeployService.enqueue(backgroundWorker.id, this._prisma);
136151
await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma);
137152
await TimeoutDeploymentService.dequeue(deployment.id, this._prisma);

packages/core-apps/src/provider.ts

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ export interface TaskOperationsRestoreOptions {
6464
checkpointId: string;
6565
}
6666

67+
export interface TaskOperationsPrePullImageOptions {
68+
shortCode: string;
69+
imageRef: string;
70+
// identifiers
71+
envId: string;
72+
envType: EnvironmentType;
73+
orgId: string;
74+
projectId: string;
75+
deploymentId: string;
76+
}
77+
6778
export interface TaskOperations {
6879
init: () => Promise<any>;
6980

@@ -73,8 +84,10 @@ export interface TaskOperations {
7384
restore: (opts: TaskOperationsRestoreOptions) => Promise<any>;
7485

7586
// unimplemented
76-
delete: (...args: any[]) => Promise<any>;
77-
get: (...args: any[]) => Promise<any>;
87+
delete?: (...args: any[]) => Promise<any>;
88+
get?: (...args: any[]) => Promise<any>;
89+
90+
prePullImage?: (opts: TaskOperationsPrePullImageOptions) => Promise<any>;
7891
}
7992

8093
type ProviderShellOptions = {
@@ -277,6 +290,27 @@ export class ProviderShell implements Provider {
277290
logger.error("restore failed", error);
278291
}
279292
},
293+
PRE_PULL_IMAGE: async (message) => {
294+
if (!this.tasks.prePullImage) {
295+
logger.debug("prePullImage not implemented", message);
296+
return;
297+
}
298+
299+
try {
300+
await this.tasks.prePullImage({
301+
shortCode: message.shortCode,
302+
imageRef: message.imageRef,
303+
// identifiers
304+
envId: message.envId,
305+
envType: message.envType,
306+
orgId: message.orgId,
307+
projectId: message.projectId,
308+
deploymentId: message.deploymentId,
309+
});
310+
} catch (error) {
311+
logger.error("prePullImage failed", error);
312+
}
313+
},
280314
},
281315
});
282316

@@ -306,9 +340,12 @@ export class ProviderShell implements Provider {
306340
case "/delete": {
307341
const body = await getTextBody(req);
308342

309-
await this.tasks.delete({ runId: body });
310-
311-
return reply.text(`sent delete request: ${body}`);
343+
if (this.tasks.delete) {
344+
await this.tasks.delete({ runId: body });
345+
return reply.text(`sent delete request: ${body}`);
346+
} else {
347+
return reply.text("delete not implemented", 501);
348+
}
312349
}
313350
default: {
314351
return reply.empty(404);

packages/core/src/v3/schemas/messages.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,19 @@ export const PlatformToProviderMessages = {
367367
runId: z.string(),
368368
}),
369369
},
370+
PRE_PULL_IMAGE: {
371+
message: z.object({
372+
version: z.literal("v1").default("v1"),
373+
imageRef: z.string(),
374+
shortCode: z.string(),
375+
// identifiers
376+
envId: z.string(),
377+
envType: EnvironmentType,
378+
orgId: z.string(),
379+
projectId: z.string(),
380+
deploymentId: z.string(),
381+
}),
382+
},
370383
};
371384

372385
const CreateWorkerMessage = z.object({

0 commit comments

Comments
 (0)