Skip to content

Commit 5cf90da

Browse files
authored
v3: fix unfreezable state crashes for runs with multiple waits (#1253)
* support named capture groups * write crash errors to attempt.error * make restored pod names unique per checkpoint * use last eight characters of checkpoint id instead * add more chaos monkey env vars * Ignore unfreezable states * prevent excessive queue config parsing errors * handle dependency resume edge case * better entry point logging * ignore checkpoint cancellation timeouts * add missing idempotency keys to wait for dep replays * remove checkpoints between attempts * fix retry container names on kubernetes * add changeset * fix types * bring back internal duration timers
1 parent da6ce3c commit 5cf90da

File tree

22 files changed

+453
-342
lines changed

22 files changed

+453
-342
lines changed

.changeset/hungry-sloths-promise.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
"@trigger.dev/core-apps": patch
3+
"trigger.dev": patch
4+
"@trigger.dev/core": patch
5+
---
6+
7+
Fix issues that could result in unreezable state run crashes. Details:
8+
- Never checkpoint between attempts
9+
- Some messages and socket data now include attempt numbers
10+
- Remove attempt completion replays
11+
- Additional prod entry point logging
12+
- Fail runs that receive deprecated (pre-lazy attempt) execute messages

apps/coordinator/src/chaosMonkey.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ export class ChaosMonkey {
1212
private chaosEventRate = 0.2;
1313
private delayInSeconds = 45;
1414

15-
constructor(private enabled = false) {
15+
constructor(
16+
private enabled = false,
17+
private disableErrors = false,
18+
private disableDelays = false
19+
) {
1620
if (this.enabled) {
1721
console.log("🍌 Chaos monkey enabled");
1822
}
@@ -32,8 +36,8 @@ export class ChaosMonkey {
3236

3337
async call({
3438
$,
35-
throwErrors = true,
36-
addDelays = true,
39+
throwErrors = !this.disableErrors,
40+
addDelays = !this.disableDelays,
3741
}: {
3842
$?: Execa$<string>;
3943
throwErrors?: boolean;

apps/coordinator/src/checkpointer.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type CheckpointAndPushOptions = {
1717
projectRef: string;
1818
deploymentVersion: string;
1919
shouldHeartbeat?: boolean;
20+
attemptNumber?: number;
2021
};
2122

2223
type CheckpointAndPushResult =
@@ -258,6 +259,7 @@ export class Checkpointer {
258259
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
259260
projectRef,
260261
deploymentVersion,
262+
attemptNumber,
261263
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
262264
this.#logger.log("Checkpointing with backoff", {
263265
runId,
@@ -297,6 +299,7 @@ export class Checkpointer {
297299
leaveRunning,
298300
projectRef,
299301
deploymentVersion,
302+
attemptNumber,
300303
});
301304

302305
if (result.success) {
@@ -359,6 +362,7 @@ export class Checkpointer {
359362
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
360363
projectRef,
361364
deploymentVersion,
365+
attemptNumber,
362366
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
363367
await this.init();
364368

@@ -367,6 +371,7 @@ export class Checkpointer {
367371
leaveRunning,
368372
projectRef,
369373
deploymentVersion,
374+
attemptNumber,
370375
};
371376

372377
if (!this.#dockerMode && !this.#canCheckpoint) {
@@ -417,7 +422,7 @@ export class Checkpointer {
417422

418423
this.#logger.log("Checkpointing:", { options });
419424

420-
const containterName = this.#getRunContainerName(runId);
425+
const containterName = this.#getRunContainerName(runId, attemptNumber);
421426

422427
// Create checkpoint (docker)
423428
if (this.#dockerMode) {
@@ -581,7 +586,7 @@ export class Checkpointer {
581586
return this.#failedCheckpoints.has(runId);
582587
}
583588

584-
#getRunContainerName(suffix: string) {
585-
return `task-run-${suffix}`;
589+
#getRunContainerName(suffix: string, attemptNumber?: number) {
590+
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
586591
}
587592
}

apps/coordinator/src/index.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,11 @@ const PLATFORM_SECRET = process.env.PLATFORM_SECRET || "coordinator-secret";
4949
const SECURE_CONNECTION = ["1", "true"].includes(process.env.SECURE_CONNECTION ?? "false");
5050

5151
const logger = new SimpleLogger(`[${NODE_NAME}]`);
52-
const chaosMonkey = new ChaosMonkey(!!process.env.CHAOS_MONKEY_ENABLED);
52+
const chaosMonkey = new ChaosMonkey(
53+
!!process.env.CHAOS_MONKEY_ENABLED,
54+
!!process.env.CHAOS_MONKEY_DISABLE_ERRORS,
55+
!!process.env.CHAOS_MONKEY_DISABLE_DELAYS
56+
);
5357

5458
class TaskCoordinator {
5559
#httpServer: ReturnType<typeof createServer>;
@@ -290,6 +294,7 @@ class TaskCoordinator {
290294
setSocketDataFromHeader("projectRef", "x-trigger-project-ref");
291295
setSocketDataFromHeader("runId", "x-trigger-run-id");
292296
setSocketDataFromHeader("attemptFriendlyId", "x-trigger-attempt-friendly-id", false);
297+
setSocketDataFromHeader("attemptNumber", "x-trigger-attempt-number", false);
293298
setSocketDataFromHeader("envId", "x-trigger-env-id");
294299
setSocketDataFromHeader("deploymentId", "x-trigger-deployment-id");
295300
setSocketDataFromHeader("deploymentVersion", "x-trigger-deployment-version");
@@ -306,6 +311,10 @@ class TaskCoordinator {
306311
onConnection: async (socket, handler, sender) => {
307312
const logger = new SimpleLogger(`[prod-worker][${socket.id}]`);
308313

314+
const getAttemptNumber = () => {
315+
return socket.data.attemptNumber ? parseInt(socket.data.attemptNumber) : undefined;
316+
};
317+
309318
const crashRun = async (error: { name: string; message: string; stack?: string }) => {
310319
try {
311320
this.#platformSocket?.send("RUN_CRASHED", {
@@ -381,6 +390,10 @@ class TaskCoordinator {
381390
socket.data.attemptFriendlyId = attemptFriendlyId;
382391
};
383392

393+
const updateAttemptNumber = (attemptNumber: string | number) => {
394+
socket.data.attemptNumber = String(attemptNumber);
395+
};
396+
384397
this.#platformSocket?.send("LOG", {
385398
metadata: socket.data,
386399
text: "connected",
@@ -430,6 +443,7 @@ class TaskCoordinator {
430443
});
431444

432445
updateAttemptFriendlyId(executionAck.payload.execution.attempt.id);
446+
updateAttemptNumber(executionAck.payload.execution.attempt.number);
433447
} catch (error) {
434448
logger.error("Error", { error });
435449

@@ -505,11 +519,17 @@ class TaskCoordinator {
505519

506520
updateAttemptFriendlyId(message.attemptFriendlyId);
507521

508-
this.#platformSocket?.send("READY_FOR_RESUME", message);
522+
if (message.version === "v2") {
523+
updateAttemptNumber(message.attemptNumber);
524+
}
525+
526+
this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" });
509527
});
510528

511529
// MARK: RUN COMPLETED
512-
socket.on("TASK_RUN_COMPLETED", async ({ completion, execution }, callback) => {
530+
socket.on("TASK_RUN_COMPLETED", async (message, callback) => {
531+
const { completion, execution } = message;
532+
513533
logger.log("completed task", { completionId: completion.id });
514534

515535
// Cancel all in-progress checkpoints (if any)
@@ -518,8 +538,10 @@ class TaskCoordinator {
518538
await chaosMonkey.call({ throwErrors: false });
519539

520540
const completeWithoutCheckpoint = (shouldExit: boolean) => {
541+
const supportsRetryCheckpoints = message.version === "v1";
542+
521543
this.#platformSocket?.send("TASK_RUN_COMPLETED", {
522-
version: "v1",
544+
version: supportsRetryCheckpoints ? "v1" : "v2",
523545
execution,
524546
completion,
525547
});
@@ -549,6 +571,11 @@ class TaskCoordinator {
549571
return;
550572
}
551573

574+
if (message.version === "v2") {
575+
completeWithoutCheckpoint(true);
576+
return;
577+
}
578+
552579
const { canCheckpoint, willSimulate } = await this.#checkpointer.init();
553580

554581
const willCheckpointAndRestore = canCheckpoint || willSimulate;
@@ -681,6 +708,7 @@ class TaskCoordinator {
681708
runId: socket.data.runId,
682709
projectRef: socket.data.projectRef,
683710
deploymentVersion: socket.data.deploymentVersion,
711+
attemptNumber: getAttemptNumber(),
684712
});
685713

686714
if (!checkpoint) {
@@ -752,6 +780,7 @@ class TaskCoordinator {
752780
runId: socket.data.runId,
753781
projectRef: socket.data.projectRef,
754782
deploymentVersion: socket.data.deploymentVersion,
783+
attemptNumber: getAttemptNumber(),
755784
});
756785

757786
if (!checkpoint) {
@@ -821,6 +850,7 @@ class TaskCoordinator {
821850
runId: socket.data.runId,
822851
projectRef: socket.data.projectRef,
823852
deploymentVersion: socket.data.deploymentVersion,
853+
attemptNumber: getAttemptNumber(),
824854
});
825855

826856
if (!checkpoint) {
@@ -905,6 +935,7 @@ class TaskCoordinator {
905935
}
906936

907937
updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id);
938+
updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number);
908939

909940
callback({
910941
success: true,
@@ -924,6 +955,10 @@ class TaskCoordinator {
924955
if (message.attemptFriendlyId) {
925956
updateAttemptFriendlyId(message.attemptFriendlyId);
926957
}
958+
959+
if (message.attemptNumber) {
960+
updateAttemptNumber(message.attemptNumber);
961+
}
927962
});
928963
},
929964
onDisconnect: async (socket, handler, sender, logger) => {

apps/docker-provider/src/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class DockerTaskOperations implements TaskOperations {
109109
async create(opts: TaskOperationsCreateOptions) {
110110
await this.init();
111111

112-
const containerName = this.#getRunContainerName(opts.runId);
112+
const containerName = this.#getRunContainerName(opts.runId, opts.nextAttemptNumber);
113113

114114
const runArgs = [
115115
"run",
@@ -150,7 +150,7 @@ class DockerTaskOperations implements TaskOperations {
150150
async restore(opts: TaskOperationsRestoreOptions) {
151151
await this.init();
152152

153-
const containerName = this.#getRunContainerName(opts.runId);
153+
const containerName = this.#getRunContainerName(opts.runId, opts.attemptNumber);
154154

155155
if (!this.#canCheckpoint || this.opts.forceSimulate) {
156156
logger.log("Simulating restore");
@@ -195,8 +195,8 @@ class DockerTaskOperations implements TaskOperations {
195195
return `task-index-${suffix}`;
196196
}
197197

198-
#getRunContainerName(suffix: string) {
199-
return `task-run-${suffix}`;
198+
#getRunContainerName(suffix: string, attemptNumber?: number) {
199+
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
200200
}
201201

202202
async #sendPostStart(containerName: string): Promise<void> {

apps/docker-provider/tsconfig.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"compilerOptions": {
3-
"target": "es2016",
3+
"target": "es2018",
44
"module": "commonjs",
55
"esModuleInterop": true,
66
"forceConsistentCasingInFileNames": true,

apps/kubernetes-provider/src/index.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,12 @@ class KubernetesTaskOperations implements TaskOperations {
139139
}
140140

141141
async create(opts: TaskOperationsCreateOptions) {
142+
const containerName = this.#getRunContainerName(opts.runId, opts.nextAttemptNumber);
143+
142144
await this.#createPod(
143145
{
144146
metadata: {
145-
name: this.#getRunContainerName(opts.runId),
147+
name: containerName,
146148
namespace: this.#namespace.metadata.name,
147149
labels: {
148150
...this.#getSharedLabels(opts),
@@ -157,7 +159,7 @@ class KubernetesTaskOperations implements TaskOperations {
157159
terminationGracePeriodSeconds: 60 * 60,
158160
containers: [
159161
{
160-
name: this.#getRunContainerName(opts.runId),
162+
name: containerName,
161163
image: opts.image,
162164
ports: [
163165
{
@@ -211,7 +213,7 @@ class KubernetesTaskOperations implements TaskOperations {
211213
await this.#createPod(
212214
{
213215
metadata: {
214-
name: `${this.#getRunContainerName(opts.runId)}-${randomUUID().slice(0, 8)}`,
216+
name: `${this.#getRunContainerName(opts.runId)}-${opts.checkpointId.slice(-8)}`,
215217
namespace: this.#namespace.metadata.name,
216218
labels: {
217219
...this.#getSharedLabels(opts),
@@ -514,8 +516,8 @@ class KubernetesTaskOperations implements TaskOperations {
514516
return `task-index-${suffix}`;
515517
}
516518

517-
#getRunContainerName(suffix: string) {
518-
return `task-run-${suffix}`;
519+
#getRunContainerName(suffix: string, attemptNumber?: number) {
520+
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
519521
}
520522

521523
#getPrePullContainerName(suffix: string) {

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
2-
import { TaskRunStatus } from "@trigger.dev/database";
32
import { logger } from "~/services/logger.server";
43
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
54
import { BaseService } from "./services/baseService.server";
65
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
7-
8-
const FAILABLE_TASK_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "PENDING", "WAITING_FOR_DEPLOY"];
6+
import { FAILABLE_RUN_STATUSES } from "./taskStatus";
97

108
export class FailedTaskRunService extends BaseService {
119
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
@@ -27,7 +25,7 @@ export class FailedTaskRunService extends BaseService {
2725
return;
2826
}
2927

30-
if (!FAILABLE_TASK_RUN_STATUSES.includes(taskRun.status)) {
28+
if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
3129
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
3230
taskRun,
3331
completion,

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ function createCoordinatorNamespace(io: Server) {
128128
completion: message.completion,
129129
execution: message.execution,
130130
checkpoint: message.checkpoint,
131+
supportsRetryCheckpoints: message.version === "v1",
131132
});
132133
},
133134
TASK_RUN_FAILED_TO_RUN: async (message) => {

0 commit comments

Comments
 (0)