Skip to content

v3: fix unfreezable state crashes for runs with multiple waits #1253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/hungry-sloths-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
"@trigger.dev/core-apps": patch
"trigger.dev": patch
"@trigger.dev/core": patch
---

Fix issues that could result in unreezable state run crashes. Details:
- Never checkpoint between attempts
- Some messages and socket data now include attempt numbers
- Remove attempt completion replays
- Additional prod entry point logging
- Fail runs that receive deprecated (pre-lazy attempt) execute messages
10 changes: 7 additions & 3 deletions apps/coordinator/src/chaosMonkey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ export class ChaosMonkey {
private chaosEventRate = 0.2;
private delayInSeconds = 45;

constructor(private enabled = false) {
constructor(
private enabled = false,
private disableErrors = false,
private disableDelays = false
) {
if (this.enabled) {
console.log("🍌 Chaos monkey enabled");
}
Expand All @@ -32,8 +36,8 @@ export class ChaosMonkey {

async call({
$,
throwErrors = true,
addDelays = true,
throwErrors = !this.disableErrors,
addDelays = !this.disableDelays,
}: {
$?: Execa$<string>;
throwErrors?: boolean;
Expand Down
11 changes: 8 additions & 3 deletions apps/coordinator/src/checkpointer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CheckpointAndPushOptions = {
projectRef: string;
deploymentVersion: string;
shouldHeartbeat?: boolean;
attemptNumber?: number;
};

type CheckpointAndPushResult =
Expand Down Expand Up @@ -258,6 +259,7 @@ export class Checkpointer {
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
projectRef,
deploymentVersion,
attemptNumber,
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
this.#logger.log("Checkpointing with backoff", {
runId,
Expand Down Expand Up @@ -297,6 +299,7 @@ export class Checkpointer {
leaveRunning,
projectRef,
deploymentVersion,
attemptNumber,
});

if (result.success) {
Expand Down Expand Up @@ -359,6 +362,7 @@ export class Checkpointer {
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
projectRef,
deploymentVersion,
attemptNumber,
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
await this.init();

Expand All @@ -367,6 +371,7 @@ export class Checkpointer {
leaveRunning,
projectRef,
deploymentVersion,
attemptNumber,
};

if (!this.#dockerMode && !this.#canCheckpoint) {
Expand Down Expand Up @@ -417,7 +422,7 @@ export class Checkpointer {

this.#logger.log("Checkpointing:", { options });

const containterName = this.#getRunContainerName(runId);
const containterName = this.#getRunContainerName(runId, attemptNumber);

// Create checkpoint (docker)
if (this.#dockerMode) {
Expand Down Expand Up @@ -581,7 +586,7 @@ export class Checkpointer {
return this.#failedCheckpoints.has(runId);
}

#getRunContainerName(suffix: string) {
return `task-run-${suffix}`;
#getRunContainerName(suffix: string, attemptNumber?: number) {
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
}
}
43 changes: 39 additions & 4 deletions apps/coordinator/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret";
const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false");

const logger = new SimpleLogger(`[${NODE_NAME}]`);
const chaosMonkey = new ChaosMonkey(!!process.env.CHAOS_MONKEY_ENABLED);
const chaosMonkey = new ChaosMonkey(
!!process.env.CHAOS_MONKEY_ENABLED,
!!process.env.CHAOS_MONKEY_DISABLE_ERRORS,
!!process.env.CHAOS_MONKEY_DISABLE_DELAYS
);

class TaskCoordinator {
#httpServer: ReturnType<typeof createServer>;
Expand Down Expand Up @@ -290,6 +294,7 @@ class TaskCoordinator {
setSocketDataFromHeader("projectRef", "x-trigger-project-ref");
setSocketDataFromHeader("runId", "x-trigger-run-id");
setSocketDataFromHeader("attemptFriendlyId", "x-trigger-attempt-friendly-id", false);
setSocketDataFromHeader("attemptNumber", "x-trigger-attempt-number", false);
setSocketDataFromHeader("envId", "x-trigger-env-id");
setSocketDataFromHeader("deploymentId", "x-trigger-deployment-id");
setSocketDataFromHeader("deploymentVersion", "x-trigger-deployment-version");
Expand All @@ -306,6 +311,10 @@ class TaskCoordinator {
onConnection: async (socket, handler, sender) => {
const logger = new SimpleLogger(`[prod-worker][${socket.id}]`);

const getAttemptNumber = () => {
return socket.data.attemptNumber ? parseInt(socket.data.attemptNumber) : undefined;
};

const crashRun = async (error: { name: string; message: string; stack?: string }) => {
try {
this.#platformSocket?.send("RUN_CRASHED", {
Expand Down Expand Up @@ -381,6 +390,10 @@ class TaskCoordinator {
socket.data.attemptFriendlyId = attemptFriendlyId;
};

const updateAttemptNumber = (attemptNumber: string | number) => {
socket.data.attemptNumber = String(attemptNumber);
};

this.#platformSocket?.send("LOG", {
metadata: socket.data,
text: "connected",
Expand Down Expand Up @@ -430,6 +443,7 @@ class TaskCoordinator {
});

updateAttemptFriendlyId(executionAck.payload.execution.attempt.id);
updateAttemptNumber(executionAck.payload.execution.attempt.number);
} catch (error) {
logger.error("Error", { error });

Expand Down Expand Up @@ -505,11 +519,17 @@ class TaskCoordinator {

updateAttemptFriendlyId(message.attemptFriendlyId);

this.#platformSocket?.send("READY_FOR_RESUME", message);
if (message.version === "v2") {
updateAttemptNumber(message.attemptNumber);
}

this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" });
});

// MARK: RUN COMPLETED
socket.on("TASK_RUN_COMPLETED", async ({ completion, execution }, callback) => {
socket.on("TASK_RUN_COMPLETED", async (message, callback) => {
const { completion, execution } = message;

logger.log("completed task", { completionId: completion.id });

// Cancel all in-progress checkpoints (if any)
Expand All @@ -518,8 +538,10 @@ class TaskCoordinator {
await chaosMonkey.call({ throwErrors: false });

const completeWithoutCheckpoint = (shouldExit: boolean) => {
const supportsRetryCheckpoints = message.version === "v1";

this.#platformSocket?.send("TASK_RUN_COMPLETED", {
version: "v1",
version: supportsRetryCheckpoints ? "v1" : "v2",
execution,
completion,
});
Expand Down Expand Up @@ -549,6 +571,11 @@ class TaskCoordinator {
return;
}

if (message.version === "v2") {
completeWithoutCheckpoint(true);
return;
}

const { canCheckpoint, willSimulate } = await this.#checkpointer.init();

const willCheckpointAndRestore = canCheckpoint || willSimulate;
Expand Down Expand Up @@ -681,6 +708,7 @@ class TaskCoordinator {
runId: socket.data.runId,
projectRef: socket.data.projectRef,
deploymentVersion: socket.data.deploymentVersion,
attemptNumber: getAttemptNumber(),
});

if (!checkpoint) {
Expand Down Expand Up @@ -752,6 +780,7 @@ class TaskCoordinator {
runId: socket.data.runId,
projectRef: socket.data.projectRef,
deploymentVersion: socket.data.deploymentVersion,
attemptNumber: getAttemptNumber(),
});

if (!checkpoint) {
Expand Down Expand Up @@ -821,6 +850,7 @@ class TaskCoordinator {
runId: socket.data.runId,
projectRef: socket.data.projectRef,
deploymentVersion: socket.data.deploymentVersion,
attemptNumber: getAttemptNumber(),
});

if (!checkpoint) {
Expand Down Expand Up @@ -905,6 +935,7 @@ class TaskCoordinator {
}

updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id);
updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number);

callback({
success: true,
Expand All @@ -924,6 +955,10 @@ class TaskCoordinator {
if (message.attemptFriendlyId) {
updateAttemptFriendlyId(message.attemptFriendlyId);
}

if (message.attemptNumber) {
updateAttemptNumber(message.attemptNumber);
}
});
},
onDisconnect: async (socket, handler, sender, logger) => {
Expand Down
8 changes: 4 additions & 4 deletions apps/docker-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class DockerTaskOperations implements TaskOperations {
async create(opts: TaskOperationsCreateOptions) {
await this.init();

const containerName = this.#getRunContainerName(opts.runId);
const containerName = this.#getRunContainerName(opts.runId, opts.nextAttemptNumber);

const runArgs = [
"run",
Expand Down Expand Up @@ -150,7 +150,7 @@ class DockerTaskOperations implements TaskOperations {
async restore(opts: TaskOperationsRestoreOptions) {
await this.init();

const containerName = this.#getRunContainerName(opts.runId);
const containerName = this.#getRunContainerName(opts.runId, opts.attemptNumber);

if (!this.#canCheckpoint || this.opts.forceSimulate) {
logger.log("Simulating restore");
Expand Down Expand Up @@ -195,8 +195,8 @@ class DockerTaskOperations implements TaskOperations {
return `task-index-${suffix}`;
}

#getRunContainerName(suffix: string) {
return `task-run-${suffix}`;
#getRunContainerName(suffix: string, attemptNumber?: number) {
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
}

async #sendPostStart(containerName: string): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion apps/docker-provider/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"compilerOptions": {
"target": "es2016",
"target": "es2018",
"module": "commonjs",
"esModuleInterop": true,
"forceConsistentCasingInFileNames": true,
Expand Down
12 changes: 7 additions & 5 deletions apps/kubernetes-provider/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,12 @@ class KubernetesTaskOperations implements TaskOperations {
}

async create(opts: TaskOperationsCreateOptions) {
const containerName = this.#getRunContainerName(opts.runId, opts.nextAttemptNumber);

await this.#createPod(
{
metadata: {
name: this.#getRunContainerName(opts.runId),
name: containerName,
namespace: this.#namespace.metadata.name,
labels: {
...this.#getSharedLabels(opts),
Expand All @@ -157,7 +159,7 @@ class KubernetesTaskOperations implements TaskOperations {
terminationGracePeriodSeconds: 60 * 60,
containers: [
{
name: this.#getRunContainerName(opts.runId),
name: containerName,
image: opts.image,
ports: [
{
Expand Down Expand Up @@ -211,7 +213,7 @@ class KubernetesTaskOperations implements TaskOperations {
await this.#createPod(
{
metadata: {
name: `${this.#getRunContainerName(opts.runId)}-${randomUUID().slice(0, 8)}`,
name: `${this.#getRunContainerName(opts.runId)}-${opts.checkpointId.slice(-8)}`,
namespace: this.#namespace.metadata.name,
labels: {
...this.#getSharedLabels(opts),
Expand Down Expand Up @@ -514,8 +516,8 @@ class KubernetesTaskOperations implements TaskOperations {
return `task-index-${suffix}`;
}

#getRunContainerName(suffix: string) {
return `task-run-${suffix}`;
#getRunContainerName(suffix: string, attemptNumber?: number) {
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
}

#getPrePullContainerName(suffix: string) {
Expand Down
6 changes: 2 additions & 4 deletions apps/webapp/app/v3/failedTaskRun.server.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
import { TaskRunStatus } from "@trigger.dev/database";
import { logger } from "~/services/logger.server";
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
import { BaseService } from "./services/baseService.server";
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";

const FAILABLE_TASK_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "PENDING", "WAITING_FOR_DEPLOY"];
import { FAILABLE_RUN_STATUSES } from "./taskStatus";

export class FailedTaskRunService extends BaseService {
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
Expand All @@ -27,7 +25,7 @@ export class FailedTaskRunService extends BaseService {
return;
}

if (!FAILABLE_TASK_RUN_STATUSES.includes(taskRun.status)) {
if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
taskRun,
completion,
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/v3/handleSocketIo.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ function createCoordinatorNamespace(io: Server) {
completion: message.completion,
execution: message.execution,
checkpoint: message.checkpoint,
supportsRetryCheckpoints: message.version === "v1",
});
},
TASK_RUN_FAILED_TO_RUN: async (message) => {
Expand Down
Loading
Loading