Skip to content

Commit f7d32b8

Browse files
authored
Free up concurrency when using triggerAndWait. Improved errors on runs (#1272)
* Add an error to the final attempt if there isn’t one * Improved the checkpointResumer test task * Using triggerAndWait or batchTriggerAndWait frees up concurrency Normally it frees up just env and org concurrency. If it’s a recursive task then it will free up the run concurrency too (e.g. a task calling itself). * Removed the filepath and export name from attempt spans * Run inspector: only show an error if the run is in a finished state
1 parent 0979a52 commit f7d32b8

File tree

7 files changed

+148
-33
lines changed

7 files changed

+148
-33
lines changed

.changeset/fast-ladybugs-eat.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Removed the folder/filepath from Attempt spans

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -135,20 +135,24 @@ export class SpanPresenter extends BasePresenter {
135135
return;
136136
}
137137

138-
const finishedAttempt = await this._replica.taskRunAttempt.findFirst({
139-
select: {
140-
output: true,
141-
outputType: true,
142-
error: true,
143-
},
144-
where: {
145-
status: { in: FINAL_ATTEMPT_STATUSES },
146-
taskRunId: run.id,
147-
},
148-
orderBy: {
149-
createdAt: "desc",
150-
},
151-
});
138+
const isFinished = isFinalRunStatus(run.status);
139+
140+
const finishedAttempt = isFinished
141+
? await this._replica.taskRunAttempt.findFirst({
142+
select: {
143+
output: true,
144+
outputType: true,
145+
error: true,
146+
},
147+
where: {
148+
status: { in: FINAL_ATTEMPT_STATUSES },
149+
taskRunId: run.id,
150+
},
151+
orderBy: {
152+
createdAt: "desc",
153+
},
154+
})
155+
: null;
152156

153157
const output =
154158
finishedAttempt === null
@@ -258,7 +262,7 @@ export class SpanPresenter extends BasePresenter {
258262
costInCents: run.costInCents,
259263
totalCostInCents: run.costInCents + run.baseCostInCents,
260264
usageDurationMs: run.usageDurationMs,
261-
isFinished: isFinalRunStatus(run.status),
265+
isFinished,
262266
isRunning: RUNNING_STATUSES.includes(run.status),
263267
payload,
264268
payloadType: run.payloadType,

apps/webapp/app/v3/failedTaskRun.server.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
1+
import { sanitizeError, TaskRunFailedExecutionResult } from "@trigger.dev/core/v3";
22
import { logger } from "~/services/logger.server";
33
import { createExceptionPropertiesFromError, eventRepository } from "./eventRepository.server";
44
import { BaseService } from "./services/baseService.server";
@@ -44,6 +44,24 @@ export class FailedTaskRunService extends BaseService {
4444
completedAt: new Date(),
4545
});
4646

47+
// Get the final attempt and add the error to it, if it's not already set
48+
const finalAttempt = await this._prisma.taskRunAttempt.findFirst({
49+
where: {
50+
taskRunId: taskRun.id,
51+
},
52+
orderBy: { id: "desc" },
53+
});
54+
55+
if (finalAttempt && !finalAttempt.error) {
56+
// Haven't set the status because the attempt might still be running
57+
await this._prisma.taskRunAttempt.update({
58+
where: { id: finalAttempt.id },
59+
data: {
60+
error: sanitizeError(completion.error),
61+
},
62+
});
63+
}
64+
4765
// Now we need to "complete" the task run event/span
4866
await eventRepository.completeEvent(taskRun.spanId, {
4967
endTime: new Date(),

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,60 @@ export class MarQS {
542542
);
543543
}
544544

545+
public async releaseConcurrency(messageId: string, releaseForRun: boolean = false) {
546+
return this.#trace(
547+
"releaseConcurrency",
548+
async (span) => {
549+
span.setAttributes({
550+
[SemanticAttributes.MESSAGE_ID]: messageId,
551+
});
552+
553+
const message = await this.readMessage(messageId);
554+
555+
if (!message) {
556+
return;
557+
}
558+
559+
span.setAttributes({
560+
[SemanticAttributes.QUEUE]: message.queue,
561+
[SemanticAttributes.MESSAGE_ID]: message.messageId,
562+
[SemanticAttributes.CONCURRENCY_KEY]: message.concurrencyKey,
563+
[SemanticAttributes.PARENT_QUEUE]: message.parentQueue,
564+
});
565+
566+
const concurrencyKey = this.keys.currentConcurrencyKeyFromQueue(message.queue);
567+
const envConcurrencyKey = this.keys.envCurrentConcurrencyKeyFromQueue(message.queue);
568+
const orgConcurrencyKey = this.keys.orgCurrentConcurrencyKeyFromQueue(message.queue);
569+
570+
logger.debug("Calling releaseConcurrency", {
571+
messageId,
572+
queue: message.queue,
573+
concurrencyKey,
574+
envConcurrencyKey,
575+
orgConcurrencyKey,
576+
service: this.name,
577+
releaseForRun,
578+
});
579+
580+
return this.redis.releaseConcurrency(
581+
//don't release the for the run, it breaks concurrencyLimits
582+
releaseForRun ? concurrencyKey : "",
583+
envConcurrencyKey,
584+
orgConcurrencyKey,
585+
message.messageId
586+
);
587+
},
588+
{
589+
kind: SpanKind.CONSUMER,
590+
attributes: {
591+
[SEMATTRS_MESSAGING_OPERATION]: "releaseConcurrency",
592+
[SEMATTRS_MESSAGE_ID]: messageId,
593+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
594+
},
595+
}
596+
);
597+
}
598+
545599
async #trace<T>(
546600
name: string,
547601
fn: (span: Span) => Promise<T>,
@@ -1488,6 +1542,24 @@ end
14881542
`,
14891543
});
14901544

1545+
this.redis.defineCommand("releaseConcurrency", {
1546+
numberOfKeys: 3,
1547+
lua: `
1548+
local concurrencyKey = KEYS[1]
1549+
local envCurrentConcurrencyKey = KEYS[2]
1550+
local orgCurrentConcurrencyKey = KEYS[3]
1551+
1552+
local messageId = ARGV[1]
1553+
1554+
-- Update the concurrency keys
1555+
if concurrencyKey ~= "" then
1556+
redis.call('SREM', concurrencyKey, messageId)
1557+
end
1558+
redis.call('SREM', envCurrentConcurrencyKey, messageId)
1559+
redis.call('SREM', orgCurrentConcurrencyKey, messageId)
1560+
`,
1561+
});
1562+
14911563
this.redis.defineCommand("heartbeatMessage", {
14921564
numberOfKeys: 1,
14931565
lua: `
@@ -1699,6 +1771,14 @@ declare module "ioredis" {
16991771
callback?: Callback<void>
17001772
): Result<void, Context>;
17011773

1774+
releaseConcurrency(
1775+
concurrencyKey: string,
1776+
envConcurrencyKey: string,
1777+
orgConcurrencyKey: string,
1778+
messageId: string,
1779+
callback?: Callback<void>
1780+
): Result<void, Context>;
1781+
17021782
heartbeatMessage(
17031783
visibilityQueue: string,
17041784
messageId: string,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ export class TriggerTaskService extends BaseService {
107107
select: {
108108
id: true,
109109
status: true,
110+
taskIdentifier: true,
110111
},
111112
},
112113
},
@@ -143,6 +144,7 @@ export class TriggerTaskService extends BaseService {
143144
select: {
144145
id: true,
145146
status: true,
147+
taskIdentifier: true,
146148
},
147149
},
148150
},
@@ -391,6 +393,20 @@ export class TriggerTaskService extends BaseService {
391393
this._prisma
392394
);
393395

396+
//release the concurrency for the env and org, if part of a (batch)triggerAndWait
397+
if (dependentAttempt) {
398+
const isSameTask = dependentAttempt.taskRun.taskIdentifier === taskId;
399+
await marqs?.releaseConcurrency(dependentAttempt.taskRun.id, isSameTask);
400+
}
401+
if (dependentBatchRun?.dependentTaskAttempt) {
402+
const isSameTask =
403+
dependentBatchRun.dependentTaskAttempt.taskRun.taskIdentifier === taskId;
404+
await marqs?.releaseConcurrency(
405+
dependentBatchRun.dependentTaskAttempt.taskRun.id,
406+
isSameTask
407+
);
408+
}
409+
394410
if (!run) {
395411
return;
396412
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -202,17 +202,6 @@ export class TaskExecutor {
202202
kind: SpanKind.CONSUMER,
203203
attributes: {
204204
[SemanticInternalAttributes.STYLE_ICON]: "attempt",
205-
...accessoryAttributes({
206-
items: [
207-
{
208-
text: ctx.task.filePath,
209-
},
210-
{
211-
text: `${ctx.task.exportName}.run()`,
212-
},
213-
],
214-
style: "codepath",
215-
}),
216205
},
217206
},
218207
this._tracer.extractContext(traceContext)

references/v3-catalog/src/trigger/checkpoints.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,16 @@ export const checkpointBatchResumer = task({
1818
/** Test that checkpoints and resuming works if the checkpoint isn't created before the resume */
1919
export const checkpointResumer = task({
2020
id: "checkpoint-resume",
21+
queue: {
22+
concurrencyLimit: 1,
23+
},
2124
run: async ({ count = 1 }: Payload) => {
22-
await noop.triggerAndWait();
23-
logger.info(`Successfully 1/3 resumed`);
24-
await noop.triggerAndWait();
25-
logger.info(`Successfully 2/3 resumed`);
26-
await noop.triggerAndWait();
27-
logger.info(`Successfully 3/3 resumed`);
25+
logger.info(`Starting ${count} runs`);
26+
27+
for (let i = 0; i < count; i++) {
28+
await noop.triggerAndWait();
29+
logger.info(`Successfully ${i + 1}/${count} resumed`);
30+
}
2831
},
2932
});
3033

0 commit comments

Comments
 (0)