Skip to content

Commit b66d552

Browse files
authored
v3: machine config (#978)
* add and use machine config * assign tasks to worker nodes only * add secure flag to zod connection * changeset * add pre stop hook * don't use secure connection by default * pass more identifiers to provider and apply labels
1 parent 719c0a0 commit b66d552

File tree

22 files changed

+328
-131
lines changed

22 files changed

+328
-131
lines changed

.changeset/clean-pianos-listen.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core-apps": patch
4+
"trigger.dev": patch
5+
"@trigger.dev/core": patch
6+
---
7+
8+
add machine config and secure zod connection

apps/coordinator/.env.example

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
HTTP_SERVER_PORT=8020
22
PLATFORM_ENABLED=true
3-
PLATFORM_WS_PORT=3030
3+
PLATFORM_WS_PORT=3030
4+
SECURE_CONNECTION=false

apps/coordinator/src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const PLATFORM_ENABLED = ["1", "true"].includes(process.env.PLATFORM_ENABLED ??
2626
const PLATFORM_HOST = process.env.PLATFORM_HOST || "127.0.0.1";
2727
const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030;
2828
const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret";
29+
const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false");
2930

3031
const logger = new SimpleLogger(`[${NODE_NAME}]`);
3132

@@ -365,6 +366,7 @@ class TaskCoordinator {
365366
namespace: "coordinator",
366367
host: PLATFORM_HOST,
367368
port: Number(PLATFORM_WS_PORT),
369+
secure: SECURE_CONNECTION,
368370
clientMessages: CoordinatorToPlatformMessages,
369371
serverMessages: PlatformToCoordinatorMessages,
370372
authToken: PLATFORM_SECRET,

apps/docker-provider/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ HTTP_SERVER_PORT=8050
22

33
PLATFORM_WS_PORT=3030
44
PLATFORM_SECRET=provider-secret
5+
SECURE_CONNECTION=false
56

67
# Use this if you are on macOS
78
# COORDINATOR_HOST="host.docker.internal"

apps/docker-provider/src/index.ts

Lines changed: 32 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
TaskOperationsIndexOptions,
99
} from "@trigger.dev/core-apps";
1010
import { setTimeout } from "node:timers/promises";
11+
import { PostStartCauses, PreStopCauses } from "@trigger.dev/core/v3";
1112

1213
const MACHINE_NAME = process.env.MACHINE_NAME || "local";
1314
const COORDINATOR_PORT = process.env.COORDINATOR_PORT || 8020;
@@ -190,6 +191,9 @@ class DockerTaskOperations implements TaskOperations {
190191
async delete(opts: { runId: string }) {
191192
await this.#initialize();
192193

194+
const containerName = this.#getRunContainerName(opts.runId);
195+
await this.#sendPreStop(containerName);
196+
193197
logger.log("noop: delete");
194198
}
195199

@@ -208,6 +212,26 @@ class DockerTaskOperations implements TaskOperations {
208212
}
209213

210214
async #sendPostStart(containerName: string): Promise<void> {
215+
try {
216+
const port = await this.#getHttpServerPort(containerName);
217+
logger.debug(await this.#runLifecycleCommand(containerName, port, "postStart", "restore"));
218+
} catch (error) {
219+
logger.error("postStart error", { error });
220+
throw new Error("postStart command failed");
221+
}
222+
}
223+
224+
async #sendPreStop(containerName: string): Promise<void> {
225+
try {
226+
const port = await this.#getHttpServerPort(containerName);
227+
logger.debug(await this.#runLifecycleCommand(containerName, port, "preStop", "terminate"));
228+
} catch (error) {
229+
logger.error("preStop error", { error });
230+
throw new Error("preStop command failed");
231+
}
232+
}
233+
234+
async #getHttpServerPort(containerName: string): Promise<number> {
211235
// We first get the correct port, which is random during dev as we run with host networking and need to avoid clashes
212236
// FIXME: Skip this in prod
213237
const logs = logger.debug(await $`docker logs ${containerName}`);
@@ -219,19 +243,14 @@ class DockerTaskOperations implements TaskOperations {
219243
throw new Error("failed to extract port from logs");
220244
}
221245

222-
try {
223-
logger.debug(await this.#runLifecycleCommand(containerName, port, "postStart", "restore"));
224-
} catch (error) {
225-
logger.error("postStart error", { error });
226-
throw new Error("postStart command failed");
227-
}
246+
return port;
228247
}
229248

230-
async #runLifecycleCommand(
249+
async #runLifecycleCommand<THookType extends "postStart" | "preStop">(
231250
containerName: string,
232251
port: number,
233-
type: "postStart" | "preStop",
234-
cause: "index" | "create" | "restore",
252+
type: THookType,
253+
cause: THookType extends "postStart" ? PostStartCauses : PreStopCauses,
235254
retryCount = 0
236255
): Promise<ExecaChildProcess> {
237256
try {
@@ -244,15 +263,15 @@ class DockerTaskOperations implements TaskOperations {
244263
`127.0.0.1:${port}/${type}?cause=${cause}`,
245264
]);
246265
} catch (error: any) {
247-
if (retryCount < 6) {
248-
logger.debug("retriable postStart error", { retryCount, message: error?.message });
266+
if (type === "postStart" && retryCount < 6) {
267+
logger.debug(`retriable ${type} error`, { retryCount, message: error?.message });
249268
await setTimeout(exponentialBackoff(retryCount + 1, 2, 50, 1150, 50));
250269

251270
return this.#runLifecycleCommand(containerName, port, type, cause, retryCount + 1);
252271
}
253272

254-
logger.error("final postStart error", { message: error?.message });
255-
throw new Error(`postStart command failed after ${retryCount - 1} retries`);
273+
logger.error(`final ${type} error`, { message: error?.message });
274+
throw new Error(`${type} command failed after ${retryCount - 1} retries`);
256275
}
257276
}
258277
}

apps/kubernetes-provider/.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ HTTP_SERVER_PORT=8060
22

33
PLATFORM_WS_PORT=3030
44
PLATFORM_SECRET=provider-secret
5+
SECURE_CONNECTION=false
56

67
# Use this if you are on macOS
78
# COORDINATOR_HOST="host.docker.internal"

apps/kubernetes-provider/src/index.ts

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
TaskOperationsIndexOptions,
88
TaskOperationsRestoreOptions,
99
} from "@trigger.dev/core-apps";
10+
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
1011
import { randomUUID } from "crypto";
1112

1213
const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
@@ -55,6 +56,12 @@ class KubernetesTaskOperations implements TaskOperations {
5556
metadata: {
5657
labels: {
5758
app: "task-index",
59+
"app.kubernetes.io/part-of": "trigger-worker",
60+
"app.kubernetes.io/component": "index",
61+
env: opts.envId,
62+
envtype: this.#envTypeToLabelValue(opts.envType),
63+
org: opts.orgId,
64+
project: opts.projectId,
5865
},
5966
},
6067
spec: {
@@ -64,6 +71,9 @@ class KubernetesTaskOperations implements TaskOperations {
6471
name: "registry-trigger",
6572
},
6673
],
74+
nodeSelector: {
75+
nodetype: "worker",
76+
},
6777
containers: [
6878
{
6979
name: this.#getIndexContainerName(opts.shortCode),
@@ -79,6 +89,13 @@ class KubernetesTaskOperations implements TaskOperations {
7989
// memory: "50Mi",
8090
// },
8191
// },
92+
lifecycle: {
93+
preStop: {
94+
exec: {
95+
command: this.#getLifecycleCommand("preStop", "terminate"),
96+
},
97+
},
98+
},
8299
env: [
83100
{
84101
name: "DEBUG",
@@ -151,6 +168,13 @@ class KubernetesTaskOperations implements TaskOperations {
151168
namespace: this.#namespace.metadata.name,
152169
labels: {
153170
app: "task-run",
171+
"app.kubernetes.io/part-of": "trigger-worker",
172+
"app.kubernetes.io/component": "create",
173+
env: opts.envId,
174+
envtype: this.#envTypeToLabelValue(opts.envType),
175+
org: opts.orgId,
176+
project: opts.projectId,
177+
run: opts.runId,
154178
},
155179
},
156180
spec: {
@@ -160,6 +184,9 @@ class KubernetesTaskOperations implements TaskOperations {
160184
name: "registry-trigger",
161185
},
162186
],
187+
nodeSelector: {
188+
nodetype: "worker",
189+
},
163190
containers: [
164191
{
165192
name: this.#getRunContainerName(opts.runId),
@@ -169,9 +196,9 @@ class KubernetesTaskOperations implements TaskOperations {
169196
containerPort: 8000,
170197
},
171198
],
172-
// resources: {
173-
// limits: opts.machine,
174-
// },
199+
resources: {
200+
limits: this.#getResourcesFromMachineConfig(opts.machine),
201+
},
175202
lifecycle: {
176203
postStart: {
177204
exec: {
@@ -180,7 +207,7 @@ class KubernetesTaskOperations implements TaskOperations {
180207
},
181208
preStop: {
182209
exec: {
183-
command: this.#getLifecycleCommand("preStop", "create"),
210+
command: this.#getLifecycleCommand("preStop", "terminate"),
184211
},
185212
},
186213
},
@@ -262,6 +289,14 @@ class KubernetesTaskOperations implements TaskOperations {
262289
namespace: this.#namespace.metadata.name,
263290
labels: {
264291
app: "task-run",
292+
"app.kubernetes.io/part-of": "trigger-worker",
293+
"app.kubernetes.io/component": "restore",
294+
env: opts.envId,
295+
envtype: this.#envTypeToLabelValue(opts.envType),
296+
org: opts.orgId,
297+
project: opts.projectId,
298+
run: opts.runId,
299+
checkpoint: opts.checkpointId,
265300
},
266301
},
267302
spec: {
@@ -271,6 +306,9 @@ class KubernetesTaskOperations implements TaskOperations {
271306
name: "registry-trigger",
272307
},
273308
],
309+
nodeSelector: {
310+
nodetype: "worker",
311+
},
274312
initContainers: [
275313
{
276314
name: "pull-base-image",
@@ -309,9 +347,9 @@ class KubernetesTaskOperations implements TaskOperations {
309347
containerPort: 8000,
310348
},
311349
],
312-
// resources: {
313-
// limits: opts.machine,
314-
// },
350+
resources: {
351+
limits: this.#getResourcesFromMachineConfig(opts.machine),
352+
},
315353
lifecycle: {
316354
postStart: {
317355
exec: {
@@ -320,7 +358,7 @@ class KubernetesTaskOperations implements TaskOperations {
320358
},
321359
preStop: {
322360
exec: {
323-
command: this.#getLifecycleCommand("preStop", "restore"),
361+
command: this.#getLifecycleCommand("preStop", "terminate"),
324362
},
325363
},
326364
},
@@ -355,7 +393,30 @@ class KubernetesTaskOperations implements TaskOperations {
355393
await this.#getPod(opts.runId, this.#namespace);
356394
}
357395

358-
#getLifecycleCommand(type: "postStart" | "preStop", cause: "index" | "create" | "restore") {
396+
#envTypeToLabelValue(type: EnvironmentType) {
397+
switch (type) {
398+
case "PRODUCTION":
399+
return "prod";
400+
case "STAGING":
401+
return "stg";
402+
case "DEVELOPMENT":
403+
return "dev";
404+
case "PREVIEW":
405+
return "preview";
406+
}
407+
}
408+
409+
#getResourcesFromMachineConfig(config: Machine) {
410+
return {
411+
cpu: `${config.cpu}`,
412+
memory: `${config.memory}G`,
413+
};
414+
}
415+
416+
#getLifecycleCommand<THookType extends "postStart" | "preStop">(
417+
type: THookType,
418+
cause: THookType extends "postStart" ? PostStartCauses : PreStopCauses
419+
) {
359420
return ["/bin/sh", "-c", `sleep 1; wget -q -O- 127.0.0.1:8000/${type}?cause=${cause}`];
360421
}
361422

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api";
22
import {
3+
Machine,
34
ProdTaskRunExecution,
45
ProdTaskRunExecutionPayload,
56
TaskRunError,
@@ -438,10 +439,27 @@ export class SharedQueueConsumer {
438439
queueId: queue.id,
439440
runtimeEnvironmentId: environment.id,
440441
},
442+
include: {
443+
backgroundWorkerTask: true,
444+
},
441445
});
442446

443447
const isRetry = taskRunAttempt.number > 1;
444448

449+
const { machineConfig } = taskRunAttempt.backgroundWorkerTask;
450+
const machine = Machine.safeParse(machineConfig ?? {});
451+
452+
if (!machine.success) {
453+
logger.error("Failed to parse machine config", {
454+
queueMessage: message.data,
455+
messageId: message.messageId,
456+
attemptId: taskRunAttempt.id,
457+
machineConfig,
458+
});
459+
460+
await this.#ackAndDoMoreWork(message.messageId);
461+
return;
462+
}
445463
try {
446464
if (messageBody.data.checkpointEventId) {
447465
const restoreService = new RestoreCheckpointService();
@@ -470,11 +488,16 @@ export class SharedQueueConsumer {
470488
backgroundWorkerId: deployment.worker.friendlyId,
471489
data: {
472490
type: "SCHEDULE_ATTEMPT",
473-
id: taskRunAttempt.id,
474491
image: deployment.imageReference,
492+
version: deployment.version,
493+
machine: machine.data,
494+
// identifiers
495+
id: taskRunAttempt.id,
475496
envId: environment.id,
497+
envType: environment.type,
498+
orgId: environment.organizationId,
499+
projectId: environment.projectId,
476500
runId: taskRunAttempt.taskRunId,
477-
version: deployment.version,
478501
},
479502
});
480503
}

apps/webapp/app/v3/services/createBackgroundWorker.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ export async function createBackgroundTasks(
101101
exportName: task.exportName,
102102
retryConfig: task.retry,
103103
queueConfig: task.queue,
104+
machineConfig: task.machine,
104105
},
105106
});
106107

apps/webapp/app/v3/services/indexDeployment.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,13 @@ export class IndexDeploymentService extends BaseService {
4949
version: "v1",
5050
shortCode: deployment.shortCode,
5151
imageTag: deployment.imageReference,
52-
envId: deployment.environmentId,
5352
apiKey: deployment.environment.apiKey,
5453
apiUrl: env.APP_ORIGIN,
54+
// identifiers
55+
envId: deployment.environmentId,
56+
envType: deployment.environment.type,
57+
projectId: deployment.projectId,
58+
orgId: deployment.environment.organizationId,
5559
});
5660

5761
logger.debug("Index ACK received", { responses });

0 commit comments

Comments
 (0)