Skip to content

v3: machine config #978

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/clean-pianos-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core-apps": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

add machine config and secure zod connection
3 changes: 2 additions & 1 deletion apps/coordinator/.env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
HTTP_SERVER_PORT=8020
PLATFORM_ENABLED=true
PLATFORM_WS_PORT=3030
PLATFORM_WS_PORT=3030
SECURE_CONNECTION=false
2 changes: 2 additions & 0 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const PLATFORM_ENABLED = ["1", "true"].includes(process.env.PLATFORM_ENABLED ??
const PLATFORM_HOST = process.env.PLATFORM_HOST || "127.0.0.1";
const PLATFORM_WS_PORT = process.env.PLATFORM_WS_PORT || 3030;
const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret";
const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false");

const logger = new SimpleLogger(`[${NODE_NAME}]`);

Expand Down Expand Up @@ -365,6 +366,7 @@ class TaskCoordinator {
namespace: "coordinator",
host: PLATFORM_HOST,
port: Number(PLATFORM_WS_PORT),
secure: SECURE_CONNECTION,
clientMessages: CoordinatorToPlatformMessages,
serverMessages: PlatformToCoordinatorMessages,
authToken: PLATFORM_SECRET,
Expand Down
1 change: 1 addition & 0 deletions apps/docker-provider/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ HTTP_SERVER_PORT=8050

PLATFORM_WS_PORT=3030
PLATFORM_SECRET=provider-secret
SECURE_CONNECTION=false

# Use this if you are on macOS
# COORDINATOR_HOST="host.docker.internal"
Expand Down
45 changes: 32 additions & 13 deletions apps/docker-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
TaskOperationsIndexOptions,
} from "@trigger.dev/core-apps";
import { setTimeout } from "node:timers/promises";
import { PostStartCauses, PreStopCauses } from "@trigger.dev/core/v3";

const MACHINE_NAME = process.env.MACHINE_NAME || "local";
const COORDINATOR_PORT = process.env.COORDINATOR_PORT || 8020;
Expand Down Expand Up @@ -190,6 +191,9 @@ class DockerTaskOperations implements TaskOperations {
async delete(opts: { runId: string }) {
await this.#initialize();

const containerName = this.#getRunContainerName(opts.runId);
await this.#sendPreStop(containerName);

logger.log("noop: delete");
}

Expand All @@ -208,6 +212,26 @@ class DockerTaskOperations implements TaskOperations {
}

async #sendPostStart(containerName: string): Promise<void> {
try {
const port = await this.#getHttpServerPort(containerName);
logger.debug(await this.#runLifecycleCommand(containerName, port, "postStart", "restore"));
} catch (error) {
logger.error("postStart error", { error });
throw new Error("postStart command failed");
}
}

async #sendPreStop(containerName: string): Promise<void> {
try {
const port = await this.#getHttpServerPort(containerName);
logger.debug(await this.#runLifecycleCommand(containerName, port, "preStop", "terminate"));
} catch (error) {
logger.error("preStop error", { error });
throw new Error("preStop command failed");
}
}

async #getHttpServerPort(containerName: string): Promise<number> {
// We first get the correct port, which is random during dev as we run with host networking and need to avoid clashes
// FIXME: Skip this in prod
const logs = logger.debug(await $`docker logs ${containerName}`);
Expand All @@ -219,19 +243,14 @@ class DockerTaskOperations implements TaskOperations {
throw new Error("failed to extract port from logs");
}

try {
logger.debug(await this.#runLifecycleCommand(containerName, port, "postStart", "restore"));
} catch (error) {
logger.error("postStart error", { error });
throw new Error("postStart command failed");
}
return port;
}

async #runLifecycleCommand(
async #runLifecycleCommand<THookType extends "postStart" | "preStop">(
containerName: string,
port: number,
type: "postStart" | "preStop",
cause: "index" | "create" | "restore",
type: THookType,
cause: THookType extends "postStart" ? PostStartCauses : PreStopCauses,
retryCount = 0
): Promise<ExecaChildProcess> {
try {
Expand All @@ -244,15 +263,15 @@ class DockerTaskOperations implements TaskOperations {
`127.0.0.1:${port}/${type}?cause=${cause}`,
]);
} catch (error: any) {
if (retryCount < 6) {
logger.debug("retriable postStart error", { retryCount, message: error?.message });
if (type === "postStart" && retryCount < 6) {
logger.debug(`retriable ${type} error`, { retryCount, message: error?.message });
await setTimeout(exponentialBackoff(retryCount + 1, 2, 50, 1150, 50));

return this.#runLifecycleCommand(containerName, port, type, cause, retryCount + 1);
}

logger.error("final postStart error", { message: error?.message });
throw new Error(`postStart command failed after ${retryCount - 1} retries`);
logger.error(`final ${type} error`, { message: error?.message });
throw new Error(`${type} command failed after ${retryCount - 1} retries`);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions apps/kubernetes-provider/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ HTTP_SERVER_PORT=8060

PLATFORM_WS_PORT=3030
PLATFORM_SECRET=provider-secret
SECURE_CONNECTION=false

# Use this if you are on macOS
# COORDINATOR_HOST="host.docker.internal"
Expand Down
79 changes: 70 additions & 9 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
TaskOperationsIndexOptions,
TaskOperationsRestoreOptions,
} from "@trigger.dev/core-apps";
import { Machine, PostStartCauses, PreStopCauses, EnvironmentType } from "@trigger.dev/core/v3";
import { randomUUID } from "crypto";

const RUNTIME_ENV = process.env.KUBERNETES_PORT ? "kubernetes" : "local";
Expand Down Expand Up @@ -55,6 +56,12 @@ class KubernetesTaskOperations implements TaskOperations {
metadata: {
labels: {
app: "task-index",
"app.kubernetes.io/part-of": "trigger-worker",
"app.kubernetes.io/component": "index",
env: opts.envId,
envtype: this.#envTypeToLabelValue(opts.envType),
org: opts.orgId,
project: opts.projectId,
},
},
spec: {
Expand All @@ -64,6 +71,9 @@ class KubernetesTaskOperations implements TaskOperations {
name: "registry-trigger",
},
],
nodeSelector: {
nodetype: "worker",
},
containers: [
{
name: this.#getIndexContainerName(opts.shortCode),
Expand All @@ -79,6 +89,13 @@ class KubernetesTaskOperations implements TaskOperations {
// memory: "50Mi",
// },
// },
lifecycle: {
preStop: {
exec: {
command: this.#getLifecycleCommand("preStop", "terminate"),
},
},
},
env: [
{
name: "DEBUG",
Expand Down Expand Up @@ -151,6 +168,13 @@ class KubernetesTaskOperations implements TaskOperations {
namespace: this.#namespace.metadata.name,
labels: {
app: "task-run",
"app.kubernetes.io/part-of": "trigger-worker",
"app.kubernetes.io/component": "create",
env: opts.envId,
envtype: this.#envTypeToLabelValue(opts.envType),
org: opts.orgId,
project: opts.projectId,
run: opts.runId,
},
},
spec: {
Expand All @@ -160,6 +184,9 @@ class KubernetesTaskOperations implements TaskOperations {
name: "registry-trigger",
},
],
nodeSelector: {
nodetype: "worker",
},
containers: [
{
name: this.#getRunContainerName(opts.runId),
Expand All @@ -169,9 +196,9 @@ class KubernetesTaskOperations implements TaskOperations {
containerPort: 8000,
},
],
// resources: {
// limits: opts.machine,
// },
resources: {
limits: this.#getResourcesFromMachineConfig(opts.machine),
},
lifecycle: {
postStart: {
exec: {
Expand All @@ -180,7 +207,7 @@ class KubernetesTaskOperations implements TaskOperations {
},
preStop: {
exec: {
command: this.#getLifecycleCommand("preStop", "create"),
command: this.#getLifecycleCommand("preStop", "terminate"),
},
},
},
Expand Down Expand Up @@ -262,6 +289,14 @@ class KubernetesTaskOperations implements TaskOperations {
namespace: this.#namespace.metadata.name,
labels: {
app: "task-run",
"app.kubernetes.io/part-of": "trigger-worker",
"app.kubernetes.io/component": "restore",
env: opts.envId,
envtype: this.#envTypeToLabelValue(opts.envType),
org: opts.orgId,
project: opts.projectId,
run: opts.runId,
checkpoint: opts.checkpointId,
},
},
spec: {
Expand All @@ -271,6 +306,9 @@ class KubernetesTaskOperations implements TaskOperations {
name: "registry-trigger",
},
],
nodeSelector: {
nodetype: "worker",
},
initContainers: [
{
name: "pull-base-image",
Expand Down Expand Up @@ -309,9 +347,9 @@ class KubernetesTaskOperations implements TaskOperations {
containerPort: 8000,
},
],
// resources: {
// limits: opts.machine,
// },
resources: {
limits: this.#getResourcesFromMachineConfig(opts.machine),
},
lifecycle: {
postStart: {
exec: {
Expand All @@ -320,7 +358,7 @@ class KubernetesTaskOperations implements TaskOperations {
},
preStop: {
exec: {
command: this.#getLifecycleCommand("preStop", "restore"),
command: this.#getLifecycleCommand("preStop", "terminate"),
},
},
},
Expand Down Expand Up @@ -355,7 +393,30 @@ class KubernetesTaskOperations implements TaskOperations {
await this.#getPod(opts.runId, this.#namespace);
}

#getLifecycleCommand(type: "postStart" | "preStop", cause: "index" | "create" | "restore") {
#envTypeToLabelValue(type: EnvironmentType) {
switch (type) {
case "PRODUCTION":
return "prod";
case "STAGING":
return "stg";
case "DEVELOPMENT":
return "dev";
case "PREVIEW":
return "preview";
}
}

#getResourcesFromMachineConfig(config: Machine) {
return {
cpu: `${config.cpu}`,
memory: `${config.memory}G`,
};
}

#getLifecycleCommand<THookType extends "postStart" | "preStop">(
type: THookType,
cause: THookType extends "postStart" ? PostStartCauses : PreStopCauses
) {
return ["/bin/sh", "-c", `sleep 1; wget -q -O- 127.0.0.1:8000/${type}?cause=${cause}`];
}

Expand Down
27 changes: 25 additions & 2 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Context, ROOT_CONTEXT, Span, SpanKind, context, trace } from "@opentelemetry/api";
import {
Machine,
ProdTaskRunExecution,
ProdTaskRunExecutionPayload,
TaskRunError,
Expand Down Expand Up @@ -438,10 +439,27 @@ export class SharedQueueConsumer {
queueId: queue.id,
runtimeEnvironmentId: environment.id,
},
include: {
backgroundWorkerTask: true,
},
});

const isRetry = taskRunAttempt.number > 1;

const { machineConfig } = taskRunAttempt.backgroundWorkerTask;
const machine = Machine.safeParse(machineConfig ?? {});

if (!machine.success) {
logger.error("Failed to parse machine config", {
queueMessage: message.data,
messageId: message.messageId,
attemptId: taskRunAttempt.id,
machineConfig,
});

await this.#ackAndDoMoreWork(message.messageId);
return;
}
try {
if (messageBody.data.checkpointEventId) {
const restoreService = new RestoreCheckpointService();
Expand Down Expand Up @@ -470,11 +488,16 @@ export class SharedQueueConsumer {
backgroundWorkerId: deployment.worker.friendlyId,
data: {
type: "SCHEDULE_ATTEMPT",
id: taskRunAttempt.id,
image: deployment.imageReference,
version: deployment.version,
machine: machine.data,
// identifiers
id: taskRunAttempt.id,
envId: environment.id,
envType: environment.type,
orgId: environment.organizationId,
projectId: environment.projectId,
runId: taskRunAttempt.taskRunId,
version: deployment.version,
},
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ export async function createBackgroundTasks(
exportName: task.exportName,
retryConfig: task.retry,
queueConfig: task.queue,
machineConfig: task.machine,
},
});

Expand Down
6 changes: 5 additions & 1 deletion apps/webapp/app/v3/services/indexDeployment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ export class IndexDeploymentService extends BaseService {
version: "v1",
shortCode: deployment.shortCode,
imageTag: deployment.imageReference,
envId: deployment.environmentId,
apiKey: deployment.environment.apiKey,
apiUrl: env.APP_ORIGIN,
// identifiers
envId: deployment.environmentId,
envType: deployment.environment.type,
projectId: deployment.projectId,
orgId: deployment.environment.organizationId,
});

logger.debug("Index ACK received", { responses });
Expand Down
Loading