Skip to content

Commit d9f64a7

Browse files
committed
remove checkpoints between attempts
1 parent 82d24d6 commit d9f64a7

File tree

15 files changed

+194
-192
lines changed

15 files changed

+194
-192
lines changed

apps/coordinator/src/checkpointer.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type CheckpointAndPushOptions = {
1717
projectRef: string;
1818
deploymentVersion: string;
1919
shouldHeartbeat?: boolean;
20+
attemptNumber?: number;
2021
};
2122

2223
type CheckpointAndPushResult =
@@ -258,6 +259,7 @@ export class Checkpointer {
258259
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
259260
projectRef,
260261
deploymentVersion,
262+
attemptNumber,
261263
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
262264
this.#logger.log("Checkpointing with backoff", {
263265
runId,
@@ -297,6 +299,7 @@ export class Checkpointer {
297299
leaveRunning,
298300
projectRef,
299301
deploymentVersion,
302+
attemptNumber,
300303
});
301304

302305
if (result.success) {
@@ -359,6 +362,7 @@ export class Checkpointer {
359362
leaveRunning = true, // This mirrors kubernetes behaviour more accurately
360363
projectRef,
361364
deploymentVersion,
365+
attemptNumber,
362366
}: CheckpointAndPushOptions): Promise<CheckpointAndPushResult> {
363367
await this.init();
364368

@@ -367,6 +371,7 @@ export class Checkpointer {
367371
leaveRunning,
368372
projectRef,
369373
deploymentVersion,
374+
attemptNumber,
370375
};
371376

372377
if (!this.#dockerMode && !this.#canCheckpoint) {
@@ -417,7 +422,7 @@ export class Checkpointer {
417422

418423
this.#logger.log("Checkpointing:", { options });
419424

420-
const containterName = this.#getRunContainerName(runId);
425+
const containterName = this.#getRunContainerName(runId, attemptNumber);
421426

422427
// Create checkpoint (docker)
423428
if (this.#dockerMode) {
@@ -581,7 +586,7 @@ export class Checkpointer {
581586
return this.#failedCheckpoints.has(runId);
582587
}
583588

584-
#getRunContainerName(suffix: string) {
585-
return `task-run-${suffix}`;
589+
#getRunContainerName(suffix: string, attemptNumber?: number) {
590+
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
586591
}
587592
}

apps/coordinator/src/index.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ class TaskCoordinator {
294294
setSocketDataFromHeader("projectRef", "x-trigger-project-ref");
295295
setSocketDataFromHeader("runId", "x-trigger-run-id");
296296
setSocketDataFromHeader("attemptFriendlyId", "x-trigger-attempt-friendly-id", false);
297+
setSocketDataFromHeader("attemptNumber", "x-trigger-attempt-number", false);
297298
setSocketDataFromHeader("envId", "x-trigger-env-id");
298299
setSocketDataFromHeader("deploymentId", "x-trigger-deployment-id");
299300
setSocketDataFromHeader("deploymentVersion", "x-trigger-deployment-version");
@@ -310,6 +311,10 @@ class TaskCoordinator {
310311
onConnection: async (socket, handler, sender) => {
311312
const logger = new SimpleLogger(`[prod-worker][${socket.id}]`);
312313

314+
const getAttemptNumber = () => {
315+
return socket.data.attemptNumber ? parseInt(socket.data.attemptNumber) : undefined;
316+
};
317+
313318
const crashRun = async (error: { name: string; message: string; stack?: string }) => {
314319
try {
315320
this.#platformSocket?.send("RUN_CRASHED", {
@@ -385,6 +390,10 @@ class TaskCoordinator {
385390
socket.data.attemptFriendlyId = attemptFriendlyId;
386391
};
387392

393+
const updateAttemptNumber = (attemptNumber: string | number) => {
394+
socket.data.attemptNumber = String(attemptNumber);
395+
};
396+
388397
this.#platformSocket?.send("LOG", {
389398
metadata: socket.data,
390399
text: "connected",
@@ -434,6 +443,7 @@ class TaskCoordinator {
434443
});
435444

436445
updateAttemptFriendlyId(executionAck.payload.execution.attempt.id);
446+
updateAttemptNumber(executionAck.payload.execution.attempt.number);
437447
} catch (error) {
438448
logger.error("Error", { error });
439449

@@ -509,11 +519,17 @@ class TaskCoordinator {
509519

510520
updateAttemptFriendlyId(message.attemptFriendlyId);
511521

512-
this.#platformSocket?.send("READY_FOR_RESUME", message);
522+
if (message.version === "v2") {
523+
updateAttemptNumber(message.attemptNumber);
524+
}
525+
526+
this.#platformSocket?.send("READY_FOR_RESUME", { ...message, version: "v1" });
513527
});
514528

515529
// MARK: RUN COMPLETED
516-
socket.on("TASK_RUN_COMPLETED", async ({ completion, execution }, callback) => {
530+
socket.on("TASK_RUN_COMPLETED", async (message, callback) => {
531+
const { completion, execution } = message;
532+
517533
logger.log("completed task", { completionId: completion.id });
518534

519535
// Cancel all in-progress checkpoints (if any)
@@ -522,8 +538,10 @@ class TaskCoordinator {
522538
await chaosMonkey.call({ throwErrors: false });
523539

524540
const completeWithoutCheckpoint = (shouldExit: boolean) => {
541+
const supportsRetryCheckpoints = message.version === "v1";
542+
525543
this.#platformSocket?.send("TASK_RUN_COMPLETED", {
526-
version: "v1",
544+
version: supportsRetryCheckpoints ? "v1" : "v2",
527545
execution,
528546
completion,
529547
});
@@ -553,6 +571,11 @@ class TaskCoordinator {
553571
return;
554572
}
555573

574+
if (message.version === "v2") {
575+
completeWithoutCheckpoint(true);
576+
return;
577+
}
578+
556579
const { canCheckpoint, willSimulate } = await this.#checkpointer.init();
557580

558581
const willCheckpointAndRestore = canCheckpoint || willSimulate;
@@ -685,6 +708,7 @@ class TaskCoordinator {
685708
runId: socket.data.runId,
686709
projectRef: socket.data.projectRef,
687710
deploymentVersion: socket.data.deploymentVersion,
711+
attemptNumber: getAttemptNumber(),
688712
});
689713

690714
if (!checkpoint) {
@@ -756,6 +780,7 @@ class TaskCoordinator {
756780
runId: socket.data.runId,
757781
projectRef: socket.data.projectRef,
758782
deploymentVersion: socket.data.deploymentVersion,
783+
attemptNumber: getAttemptNumber(),
759784
});
760785

761786
if (!checkpoint) {
@@ -825,6 +850,7 @@ class TaskCoordinator {
825850
runId: socket.data.runId,
826851
projectRef: socket.data.projectRef,
827852
deploymentVersion: socket.data.deploymentVersion,
853+
attemptNumber: getAttemptNumber(),
828854
});
829855

830856
if (!checkpoint) {
@@ -909,6 +935,7 @@ class TaskCoordinator {
909935
}
910936

911937
updateAttemptFriendlyId(createAttempt.executionPayload.execution.attempt.id);
938+
updateAttemptNumber(createAttempt.executionPayload.execution.attempt.number);
912939

913940
callback({
914941
success: true,
@@ -928,6 +955,10 @@ class TaskCoordinator {
928955
if (message.attemptFriendlyId) {
929956
updateAttemptFriendlyId(message.attemptFriendlyId);
930957
}
958+
959+
if (message.attemptNumber) {
960+
updateAttemptNumber(message.attemptNumber);
961+
}
931962
});
932963
},
933964
onDisconnect: async (socket, handler, sender, logger) => {

apps/docker-provider/src/index.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ class DockerTaskOperations implements TaskOperations {
109109
async create(opts: TaskOperationsCreateOptions) {
110110
await this.init();
111111

112-
const containerName = this.#getRunContainerName(opts.runId);
112+
const containerName = this.#getRunContainerName(opts.runId, opts.nextAttemptNumber);
113113

114114
const runArgs = [
115115
"run",
@@ -150,7 +150,7 @@ class DockerTaskOperations implements TaskOperations {
150150
async restore(opts: TaskOperationsRestoreOptions) {
151151
await this.init();
152152

153-
const containerName = this.#getRunContainerName(opts.runId);
153+
const containerName = this.#getRunContainerName(opts.runId, opts.attemptNumber);
154154

155155
if (!this.#canCheckpoint || this.opts.forceSimulate) {
156156
logger.log("Simulating restore");
@@ -195,8 +195,8 @@ class DockerTaskOperations implements TaskOperations {
195195
return `task-index-${suffix}`;
196196
}
197197

198-
#getRunContainerName(suffix: string) {
199-
return `task-run-${suffix}`;
198+
#getRunContainerName(suffix: string, attemptNumber?: number) {
199+
return `task-run-${suffix}${attemptNumber && attemptNumber > 1 ? `-att${attemptNumber}` : ""}`;
200200
}
201201

202202
async #sendPostStart(containerName: string): Promise<void> {

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
2-
import { TaskRunStatus } from "@trigger.dev/database";
32
import { logger } from "~/services/logger.server";
43
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
54
import { BaseService } from "./services/baseService.server";
65
import { FinalizeTaskRunService } from "./services/finalizeTaskRun.server";
7-
8-
const FAILABLE_TASK_RUN_STATUSES: TaskRunStatus[] = ["EXECUTING", "PENDING", "WAITING_FOR_DEPLOY"];
6+
import { FAILABLE_RUN_STATUSES } from "./taskStatus";
97

108
export class FailedTaskRunService extends BaseService {
119
public async call(anyRunId: string, completion: TaskRunFailedExecutionResult) {
@@ -27,7 +25,7 @@ export class FailedTaskRunService extends BaseService {
2725
return;
2826
}
2927

30-
if (!FAILABLE_TASK_RUN_STATUSES.includes(taskRun.status)) {
28+
if (!FAILABLE_RUN_STATUSES.includes(taskRun.status)) {
3129
logger.error("[FailedTaskRunService] Task run is not in a failable state", {
3230
taskRun,
3331
completion,

apps/webapp/app/v3/handleSocketIo.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ function createCoordinatorNamespace(io: Server) {
128128
completion: message.completion,
129129
execution: message.execution,
130130
checkpoint: message.checkpoint,
131+
supportsRetryCheckpoints: message.version === "v1",
131132
});
132133
},
133134
TASK_RUN_FAILED_TO_RUN: async (message) => {

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ export const SharedQueueMessageBody = z.discriminatedUnion("type", [
5454
type: z.literal("EXECUTE"),
5555
taskIdentifier: z.string(),
5656
checkpointEventId: z.string().optional(),
57+
retryCheckpointsDisabled: z.boolean().optional(),
5758
}),
5859
WithTraceContext.extend({
5960
type: z.literal("RESUME"),
@@ -294,12 +295,9 @@ export class SharedQueueConsumer {
294295

295296
const retryingFromCheckpoint = !!messageBody.data.checkpointEventId;
296297

297-
const EXECUTABLE_RUN_STATUSES: {
298-
fromCheckpoint: TaskRunStatus[];
299-
withoutCheckpoint: TaskRunStatus[];
300-
} = {
301-
fromCheckpoint: ["WAITING_TO_RESUME"],
302-
withoutCheckpoint: ["PENDING", "RETRYING_AFTER_FAILURE"],
298+
const EXECUTABLE_RUN_STATUSES = {
299+
fromCheckpoint: ["WAITING_TO_RESUME"] satisfies TaskRunStatus[],
300+
withoutCheckpoint: ["PENDING", "RETRYING_AFTER_FAILURE"] satisfies TaskRunStatus[],
303301
};
304302

305303
if (
@@ -474,7 +472,10 @@ export class SharedQueueConsumer {
474472
? lockedTaskRun.attempts[0].number + 1
475473
: 1;
476474

477-
const isRetry = lockedTaskRun.status === "WAITING_TO_RESUME" && nextAttemptNumber > 1;
475+
const isRetry =
476+
nextAttemptNumber > 1 &&
477+
(lockedTaskRun.status === "WAITING_TO_RESUME" ||
478+
lockedTaskRun.status === "RETRYING_AFTER_FAILURE");
478479

479480
try {
480481
if (messageBody.data.checkpointEventId) {
@@ -515,11 +516,13 @@ export class SharedQueueConsumer {
515516
}
516517
}
517518

518-
if (isRetry) {
519+
if (isRetry && !messageBody.data.retryCheckpointsDisabled) {
519520
socketIo.coordinatorNamespace.emit("READY_FOR_RETRY", {
520521
version: "v1",
521522
runId: lockedTaskRun.id,
522523
});
524+
525+
// Retries for workers with disabled retry checkpoints will be handled just like normal attempts
523526
} else {
524527
const machineConfig = lockedTaskRun.lockedBy?.machineConfig;
525528
const machine = machinePresetFromConfig(machineConfig ?? {});
@@ -531,6 +534,7 @@ export class SharedQueueConsumer {
531534
image: deployment.imageReference,
532535
version: deployment.version,
533536
machine,
537+
nextAttemptNumber,
534538
// identifiers
535539
id: "placeholder", // TODO: Remove this completely in a future release
536540
envId: lockedTaskRun.runtimeEnvironment.id,

0 commit comments

Comments
 (0)