Skip to content

v3: checkpoint failover and misc fixes #1157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/rude-toys-compare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core-apps": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Capture and display stderr on index failures
7 changes: 7 additions & 0 deletions .changeset/slow-sloths-retire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@trigger.dev/core-apps": patch
"@trigger.dev/core": patch
---

- Fix uncaught provider exception
- Remove unused provider messages
3 changes: 2 additions & 1 deletion apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ const SIMULATE_CHECKPOINT_FAILURE_SECONDS = parseInt(
);

const REGISTRY_HOST = process.env.REGISTRY_HOST || "localhost:5000";
const REGISTRY_NAMESPACE = process.env.REGISTRY_NAMESPACE || "trigger";
const CHECKPOINT_PATH = process.env.CHECKPOINT_PATH || "/checkpoints";
const REGISTRY_TLS_VERIFY = process.env.REGISTRY_TLS_VERIFY === "false" ? "false" : "true";

Expand Down Expand Up @@ -179,7 +180,7 @@ class Checkpointer {
}

#getImageRef(projectRef: string, deploymentVersion: string, shortCode: string) {
return `${REGISTRY_HOST}/trigger/${projectRef}:${deploymentVersion}.prod-${shortCode}`;
return `${REGISTRY_HOST}/${REGISTRY_NAMESPACE}/${projectRef}:${deploymentVersion}.prod-${shortCode}`;
}

#getExportLocation(projectRef: string, deploymentVersion: string, shortCode: string) {
Expand Down
48 changes: 17 additions & 31 deletions apps/docker-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,37 +85,23 @@ class DockerTaskOperations implements TaskOperations {
port: COORDINATOR_PORT,
});

try {
logger.debug(
await execa("docker", [
"run",
"--network=host",
"--rm",
`--env=INDEX_TASKS=true`,
`--env=TRIGGER_SECRET_KEY=${opts.apiKey}`,
`--env=TRIGGER_API_URL=${opts.apiUrl}`,
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}`,
`--env=POD_NAME=${containerName}`,
`--env=COORDINATOR_HOST=${COORDINATOR_HOST}`,
`--env=COORDINATOR_PORT=${COORDINATOR_PORT}`,
`--name=${containerName}`,
`${opts.imageRef}`,
])
);
} catch (error: any) {
if (!isExecaChildProcess(error)) {
throw error;
}

logger.error("Index failed:", {
opts,
exitCode: error.exitCode,
escapedCommand: error.escapedCommand,
stdout: error.stdout,
stderr: error.stderr,
});
}
logger.debug(
await execa("docker", [
"run",
"--network=host",
"--rm",
`--env=INDEX_TASKS=true`,
`--env=TRIGGER_SECRET_KEY=${opts.apiKey}`,
`--env=TRIGGER_API_URL=${opts.apiUrl}`,
`--env=TRIGGER_ENV_ID=${opts.envId}`,
`--env=OTEL_EXPORTER_OTLP_ENDPOINT=${OTEL_EXPORTER_OTLP_ENDPOINT}`,
`--env=POD_NAME=${containerName}`,
`--env=COORDINATOR_HOST=${COORDINATOR_HOST}`,
`--env=COORDINATOR_PORT=${COORDINATOR_PORT}`,
`--name=${containerName}`,
`${opts.imageRef}`,
])
);
}

async create(opts: TaskOperationsCreateOptions) {
Expand Down
11 changes: 11 additions & 0 deletions apps/webapp/app/components/runs/v3/DeploymentError.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ export function DeploymentError({ errorData }: DeploymentErrorProps) {
maxLines={20}
/>
)}
{errorData.stderr && (
<>
<DeploymentErrorHeader title="Error logs:" />
<CodeBlock
showCopyButton={false}
showLineNumbers={false}
code={errorData.stderr}
maxLines={20}
/>
</>
)}
</div>
);
}
Expand Down
5 changes: 5 additions & 0 deletions apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type ErrorData = {
name: string;
message: string;
stack?: string;
stderr?: string;
};

export class DeploymentPresenter {
Expand Down Expand Up @@ -177,17 +178,20 @@ export class DeploymentPresenter {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stack: createTaskMetadataFailedErrorStack(parsedError.data),
stderr: parsedErrorData.data.stderr,
};
} else {
return {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stderr: parsedErrorData.data.stderr,
};
}
} else {
return {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stderr: parsedErrorData.data.stderr,
};
}
}
Expand All @@ -196,6 +200,7 @@ export class DeploymentPresenter {
name: parsedErrorData.data.name,
message: parsedErrorData.data.message,
stack: parsedErrorData.data.stack,
stderr: parsedErrorData.data.stderr,
};
}
}
Expand Down
41 changes: 37 additions & 4 deletions apps/webapp/app/v3/services/deploymentIndexFailed.server.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,63 @@
import { PerformDeploymentAlertsService } from "./alerts/performDeploymentAlerts.server";
import { BaseService } from "./baseService.server";
import { logger } from "~/services/logger.server";
import { WorkerDeploymentStatus } from "@trigger.dev/database";

const FINAL_DEPLOYMENT_STATUSES: WorkerDeploymentStatus[] = [
"CANCELED",
"DEPLOYED",
"FAILED",
"TIMED_OUT",
];

export class DeploymentIndexFailed extends BaseService {
public async call(
maybeFriendlyId: string,
error: { name: string; message: string; stack?: string }
error: {
name: string;
message: string;
stack?: string;
stderr?: string;
}
) {
const isFriendlyId = maybeFriendlyId.startsWith("deployment_");

const deployment = await this._prisma.workerDeployment.update({
const deployment = await this._prisma.workerDeployment.findUnique({
where: isFriendlyId
? {
friendlyId: maybeFriendlyId,
}
: {
id: maybeFriendlyId,
},
});

if (!deployment) {
logger.error("Worker deployment not found", { maybeFriendlyId });
return;
}

if (FINAL_DEPLOYMENT_STATUSES.includes(deployment.status)) {
logger.error("Worker deployment already in final state", {
id: deployment.id,
status: deployment.status,
});
return;
}

const failedDeployment = await this._prisma.workerDeployment.update({
where: {
id: deployment.id,
},
data: {
status: "FAILED",
failedAt: new Date(),
errorData: error,
},
});

await PerformDeploymentAlertsService.enqueue(deployment.id, this._prisma);
await PerformDeploymentAlertsService.enqueue(failedDeployment.id, this._prisma);

return deployment;
return failedDeployment;
}
}
4 changes: 3 additions & 1 deletion docs/v3/open-source-self-hosting.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ scp -3 root@<webapp_machine>:docker/.env root@<worker_machine>:docker/.env
Checkpointing allows you to save the state of a running container to disk and restore it later. This can be useful for
long-running tasks that need to be paused and resumed without losing state. Think fan-out and fan-in, or long waits in email campaigns.

The checkpoints will be pushed to the same registry as the deployed images. Please see the [Registry setup](#registry-setup) section for more information.

### Requirements

- Debian, **NOT** a derivative like Ubuntu
Expand All @@ -225,7 +227,7 @@ sudo apt-get install criu
2. Tweak the config so we can successfully checkpoint our workloads

```bash
mkdir /etc/criu
mkdir -p /etc/criu

cat << EOF >/etc/criu/runc.conf
tcp-close
Expand Down
4 changes: 4 additions & 0 deletions packages/cli-v3/src/commands/deploy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,10 @@ async function _deployCommand(dir: string, options: DeployCommandOptions) {

await preExitTasks();

if (finishedDeployment.errorData.stderr) {
log.error(`stderr:\n${finishedDeployment.errorData.stderr}`);
}

throw new SkipLoggingError(
`Deployment encountered an error: ${finishedDeployment.errorData.name}`
);
Expand Down
34 changes: 18 additions & 16 deletions packages/cli-v3/src/workers/prod/backgroundWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ export class ProdBackgroundWorker {
private _onClose: Evt<void> = new Evt();

public tasks: Array<TaskMetadataWithFilePath> = [];
public stderr: Array<string> = [];

_taskRunProcess: TaskRunProcess | undefined;
private _taskRunProcessesBeingKilled: Map<number, TaskRunProcess> = new Map();
Expand Down Expand Up @@ -161,6 +162,23 @@ export class ProdBackgroundWorker {
reject(new Error("Worker timed out"));
}, 10_000);

child.stdout?.on("data", (data) => {
console.log(data.toString());
});

child.stderr?.on("data", (data) => {
console.error(data.toString());
this.stderr.push(data.toString());
});

child.on("exit", (code) => {
if (!resolved) {
clearTimeout(timeout);
resolved = true;
reject(new Error(`Worker exited with code ${code}`));
}
});

new ZodIpcConnection({
listenSchema: ProdChildToWorkerMessages,
emitSchema: ProdWorkerToChildMessages,
Expand Down Expand Up @@ -192,22 +210,6 @@ export class ProdBackgroundWorker {
},
},
});

child.stdout?.on("data", (data) => {
console.log(data.toString());
});

child.stderr?.on("data", (data) => {
console.error(data.toString());
});

child.on("exit", (code) => {
if (!resolved) {
clearTimeout(timeout);
resolved = true;
reject(new Error(`Worker exited with code ${code}`));
}
});
});

this._initialized = true;
Expand Down
7 changes: 7 additions & 0 deletions packages/cli-v3/src/workers/prod/entry-point.ts
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ class ProdWorker {
process.exit(1);
}
} catch (e) {
const stderr = this.#backgroundWorker.stderr.join("\n");

if (e instanceof TaskMetadataParseError) {
logger.error("tasks metadata parse error", {
zodIssues: e.zodIssues,
Expand All @@ -647,13 +649,15 @@ class ProdWorker {
name: "TaskMetadataParseError",
message: "There was an error parsing the task metadata",
stack: JSON.stringify({ zodIssues: e.zodIssues, tasks: e.tasks }),
stderr,
},
});
} else if (e instanceof UncaughtExceptionError) {
const error = {
name: e.originalError.name,
message: e.originalError.message,
stack: e.originalError.stack,
stderr,
};

logger.error("uncaught exception", { originalError: error });
Expand All @@ -668,6 +672,7 @@ class ProdWorker {
name: e.name,
message: e.message,
stack: e.stack,
stderr,
};

logger.error("error", { error });
Expand All @@ -686,6 +691,7 @@ class ProdWorker {
error: {
name: "Error",
message: e,
stderr,
},
});
} else {
Expand All @@ -697,6 +703,7 @@ class ProdWorker {
error: {
name: "Error",
message: "Unknown error",
stderr,
},
});
}
Expand Down
Loading
Loading