Skip to content

Commit 311aed3

Browse files
authored
Merge branch 'main' into main
2 parents ddc706a + bb65b26 commit 311aed3

File tree

9 files changed

+242
-64
lines changed

9 files changed

+242
-64
lines changed

.changeset/forty-windows-shop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Added the ability to retry runs that fail with an Out Of Memory (OOM) error on a larger machine.

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 46 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import {
55
TaskRunExecutionRetry,
66
TaskRunFailedExecutionResult,
77
} from "@trigger.dev/core/v3";
8+
import type { Prisma, TaskRun } from "@trigger.dev/database";
9+
import * as semver from "semver";
810
import { logger } from "~/services/logger.server";
11+
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
912
import { BaseService } from "./services/baseService.server";
10-
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
11-
import type { Prisma, TaskRun } from "@trigger.dev/database";
1213
import { CompleteAttemptService } from "./services/completeAttempt.server";
1314
import { CreateTaskRunAttemptService } from "./services/createTaskRunAttempt.server";
14-
import { sharedQueueTasks } from "./marqs/sharedQueueConsumer.server";
15-
import * as semver from "semver";
15+
import { isFailableRunStatus, isFinalAttemptStatus } from "./taskStatus";
1616

1717
const FailedTaskRunRetryGetPayload = {
1818
select: {
@@ -180,13 +180,52 @@ export class FailedTaskRunRetryHelper extends BaseService {
180180
}
181181
}
182182

183-
static async getExecutionRetry({
183+
static getExecutionRetry({
184184
run,
185185
execution,
186186
}: {
187187
run: TaskRunWithWorker;
188188
execution: TaskRunExecution;
189-
}): Promise<TaskRunExecutionRetry | undefined> {
189+
}): TaskRunExecutionRetry | undefined {
190+
try {
191+
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({ run, execution });
192+
if (!retryConfig) {
193+
return;
194+
}
195+
196+
const delay = calculateNextRetryDelay(retryConfig, execution.attempt.number);
197+
198+
if (!delay) {
199+
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
200+
run,
201+
execution,
202+
});
203+
204+
return;
205+
}
206+
207+
return {
208+
timestamp: Date.now() + delay,
209+
delay,
210+
};
211+
} catch (error) {
212+
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
213+
run,
214+
execution,
215+
error,
216+
});
217+
218+
return;
219+
}
220+
}
221+
222+
static getRetryConfig({
223+
run,
224+
execution,
225+
}: {
226+
run: TaskRunWithWorker;
227+
execution: TaskRunExecution;
228+
}): RetryOptions | undefined {
190229
try {
191230
const retryConfig = run.lockedBy?.retryConfig;
192231

@@ -247,21 +286,7 @@ export class FailedTaskRunRetryHelper extends BaseService {
247286
return;
248287
}
249288

250-
const delay = calculateNextRetryDelay(parsedRetryConfig.data, execution.attempt.number);
251-
252-
if (!delay) {
253-
logger.debug("[FailedTaskRunRetryHelper] No more retries", {
254-
run,
255-
execution,
256-
});
257-
258-
return;
259-
}
260-
261-
return {
262-
timestamp: Date.now() + delay,
263-
delay,
264-
};
289+
return parsedRetryConfig.data;
265290
} catch (error) {
266291
logger.error("[FailedTaskRunRetryHelper] Failed to get execution retry", {
267292
run,

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,8 @@ export class MarQS {
638638
}
639639

640640
/**
641-
* Negative acknowledge a message, which will requeue the message
641+
* Negative acknowledge a message, which will requeue the message.
642+
* Returns whether it went back into the queue or not.
642643
*/
643644
public async nackMessage(
644645
messageId: string,
@@ -657,7 +658,7 @@ export class MarQS {
657658
updates,
658659
service: this.name,
659660
});
660-
return;
661+
return false;
661662
}
662663

663664
const nackCount = await this.#getNackCount(messageId);
@@ -676,7 +677,7 @@ export class MarQS {
676677

677678
// If we have reached the maximum nack count, we will ack the message
678679
await this.acknowledgeMessage(messageId, "maximum nack count reached");
679-
return;
680+
return false;
680681
}
681682

682683
span.setAttributes({
@@ -705,6 +706,8 @@ export class MarQS {
705706
});
706707

707708
await this.options.subscriber?.messageNacked(message);
709+
710+
return true;
708711
},
709712
{
710713
kind: SpanKind.CONSUMER,

apps/webapp/app/v3/services/completeAttempt.server.ts

Lines changed: 66 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
import { Attributes } from "@opentelemetry/api";
22
import {
33
TaskRunContext,
4+
TaskRunError,
45
TaskRunErrorCodes,
56
TaskRunExecution,
67
TaskRunExecutionResult,
78
TaskRunExecutionRetry,
89
TaskRunFailedExecutionResult,
910
TaskRunSuccessfulExecutionResult,
11+
exceptionEventEnhancer,
1012
flattenAttributes,
13+
internalErrorFromUnexpectedExit,
1114
sanitizeError,
1215
shouldRetryError,
1316
taskRunErrorEnhancer,
@@ -233,7 +236,7 @@ export class CompleteAttemptService extends BaseService {
233236

234237
if (!executionRetry && shouldInfer) {
235238
executionRetryInferred = true;
236-
executionRetry = await FailedTaskRunRetryHelper.getExecutionRetry({
239+
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
237240
run: {
238241
...taskRunAttempt.taskRun,
239242
lockedBy: taskRunAttempt.backgroundWorkerTask,
@@ -243,7 +246,47 @@ export class CompleteAttemptService extends BaseService {
243246
});
244247
}
245248

246-
const retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
249+
let retriableError = shouldRetryError(taskRunErrorEnhancer(completion.error));
250+
let isOOMRetry = false;
251+
252+
//OOM errors should retry (if an OOM machine is specified)
253+
if (isOOMError(completion.error)) {
254+
const retryConfig = FailedTaskRunRetryHelper.getRetryConfig({
255+
run: {
256+
...taskRunAttempt.taskRun,
257+
lockedBy: taskRunAttempt.backgroundWorkerTask,
258+
lockedToVersion: taskRunAttempt.backgroundWorker,
259+
},
260+
execution,
261+
});
262+
263+
if (
264+
retryConfig?.outOfMemory?.machine &&
265+
retryConfig.outOfMemory.machine !== taskRunAttempt.taskRun.machinePreset
266+
) {
267+
//we will retry
268+
isOOMRetry = true;
269+
retriableError = true;
270+
executionRetry = FailedTaskRunRetryHelper.getExecutionRetry({
271+
run: {
272+
...taskRunAttempt.taskRun,
273+
lockedBy: taskRunAttempt.backgroundWorkerTask,
274+
lockedToVersion: taskRunAttempt.backgroundWorker,
275+
},
276+
execution,
277+
});
278+
279+
//update the machine on the run
280+
await this._prisma.taskRun.update({
281+
where: {
282+
id: taskRunAttempt.taskRunId,
283+
},
284+
data: {
285+
machinePreset: retryConfig.outOfMemory.machine,
286+
},
287+
});
288+
}
289+
}
247290

248291
if (
249292
retriableError &&
@@ -257,6 +300,7 @@ export class CompleteAttemptService extends BaseService {
257300
taskRunAttempt,
258301
environment,
259302
checkpoint,
303+
forceRequeue: isOOMRetry,
260304
});
261305
}
262306

@@ -378,12 +422,14 @@ export class CompleteAttemptService extends BaseService {
378422
executionRetryInferred,
379423
checkpointEventId,
380424
supportsLazyAttempts,
425+
forceRequeue = false,
381426
}: {
382427
run: TaskRun;
383428
executionRetry: TaskRunExecutionRetry;
384429
executionRetryInferred: boolean;
385430
checkpointEventId?: string;
386431
supportsLazyAttempts: boolean;
432+
forceRequeue?: boolean;
387433
}) {
388434
const retryViaQueue = () => {
389435
logger.debug("[CompleteAttemptService] Enqueuing retry attempt", { runId: run.id });
@@ -434,6 +480,12 @@ export class CompleteAttemptService extends BaseService {
434480
return;
435481
}
436482

483+
if (forceRequeue) {
484+
logger.debug("[CompleteAttemptService] Forcing retry via queue", { runId: run.id });
485+
await retryViaQueue();
486+
return;
487+
}
488+
437489
// Workers that never checkpoint between attempts will exit after completing their current attempt if the retry delay exceeds the threshold
438490
if (
439491
!this.opts.supportsRetryCheckpoints &&
@@ -466,13 +518,15 @@ export class CompleteAttemptService extends BaseService {
466518
taskRunAttempt,
467519
environment,
468520
checkpoint,
521+
forceRequeue = false,
469522
}: {
470523
execution: TaskRunExecution;
471524
executionRetry: TaskRunExecutionRetry;
472525
executionRetryInferred: boolean;
473526
taskRunAttempt: NonNullable<FoundAttempt>;
474527
environment: AuthenticatedEnvironment;
475528
checkpoint?: CheckpointData;
529+
forceRequeue?: boolean;
476530
}) {
477531
const retryAt = new Date(executionRetry.timestamp);
478532

@@ -533,6 +587,7 @@ export class CompleteAttemptService extends BaseService {
533587
executionRetry,
534588
supportsLazyAttempts: taskRunAttempt.backgroundWorker.supportsLazyAttempts,
535589
executionRetryInferred,
590+
forceRequeue,
536591
});
537592

538593
return "RETRIED";
@@ -634,3 +689,12 @@ async function findAttempt(prismaClient: PrismaClientOrTransaction, friendlyId:
634689
},
635690
});
636691
}
692+
693+
function isOOMError(error: TaskRunError) {
694+
if (error.type !== "INTERNAL_ERROR") return false;
695+
if (error.code === "TASK_PROCESS_OOM_KILLED" || error.code === "TASK_PROCESS_MAYBE_OOM_KILLED") {
696+
return true;
697+
}
698+
699+
return false;
700+
}

0 commit comments

Comments
 (0)