Skip to content

Commit 73214d7

Browse files
authored
Prevent large outputs from overwriting each other (#1971)
* large outputs now get unique keys again * replace other attempt id occurences * add changeset
1 parent 853a7ef commit 73214d7

File tree

6 files changed

+36
-9
lines changed

6 files changed

+36
-9
lines changed

.changeset/spotty-pants-wink.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"trigger.dev": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Prevent large outputs from overwriting each other

packages/cli-v3/src/entryPoints/dev-run-controller.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,7 @@ export class DevRunController {
623623
}).initialize();
624624

625625
logger.debug("executing task run process", {
626-
attemptId: execution.attempt.id,
626+
attemptNumber: execution.attempt.number,
627627
runId: execution.run.id,
628628
});
629629

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
AnyOnStartHookFunction,
88
AnyOnSuccessHookFunction,
99
apiClientManager,
10+
attemptKey,
1011
clock,
1112
ExecutorToWorkerMessageCatalog,
1213
type HandleErrorFunction,
@@ -546,7 +547,7 @@ const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
546547
for await (const _ of setInterval(heartbeatInterval)) {
547548
if (_isRunning && _execution) {
548549
try {
549-
await zodIpc.send("TASK_HEARTBEAT", { id: _execution.attempt.id });
550+
await zodIpc.send("TASK_HEARTBEAT", { id: attemptKey(_execution) });
550551
} catch (err) {
551552
logError("Failed to send HEARTBEAT message", err);
552553
}

packages/cli-v3/src/executions/taskRunProcess.ts

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import {
2+
attemptKey,
23
CompletedWaitpoint,
34
ExecutorToWorkerMessageCatalog,
45
MachinePresetResources,
@@ -164,15 +165,17 @@ export class TaskRunProcess {
164165
TASK_RUN_COMPLETED: async (message) => {
165166
const { result, execution } = message;
166167

167-
const promiseStatus = this._attemptStatuses.get(execution.attempt.id);
168+
const key = attemptKey(execution);
169+
170+
const promiseStatus = this._attemptStatuses.get(key);
168171

169172
if (promiseStatus !== "PENDING") {
170173
return;
171174
}
172175

173-
this._attemptStatuses.set(execution.attempt.id, "RESOLVED");
176+
this._attemptStatuses.set(key, "RESOLVED");
174177

175-
const attemptPromise = this._attemptPromises.get(execution.attempt.id);
178+
const attemptPromise = this._attemptPromises.get(key);
176179

177180
if (!attemptPromise) {
178181
return;
@@ -229,10 +232,12 @@ export class TaskRunProcess {
229232
rejecter = reject;
230233
});
231234

232-
this._attemptStatuses.set(params.payload.execution.attempt.id, "PENDING");
235+
const key = attemptKey(params.payload.execution);
236+
237+
this._attemptStatuses.set(key, "PENDING");
233238

234239
// @ts-expect-error - We know that the resolver and rejecter are defined
235-
this._attemptPromises.set(params.payload.execution.attempt.id, { resolver, rejecter });
240+
this._attemptPromises.set(key, { resolver, rejecter });
236241

237242
const { execution, traceContext, metrics } = params.payload;
238243

packages/core/src/v3/idempotencyKeys.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ function injectScope(scope: "run" | "attempt" | "global"): string[] {
105105
}
106106
case "attempt": {
107107
if (taskContext?.ctx) {
108-
return [taskContext.ctx.attempt.id];
108+
return [taskContext.ctx.run.id, taskContext.ctx.attempt.number.toString()];
109109
}
110110
break;
111111
}
@@ -125,3 +125,17 @@ async function generateIdempotencyKey(keyMaterial: string[]) {
125125
.map((byte) => byte.toString(16).padStart(2, "0"))
126126
.join("");
127127
}
128+
129+
type AttemptKeyMaterial = {
130+
run: {
131+
id: string;
132+
};
133+
attempt: {
134+
number: number;
135+
};
136+
};
137+
138+
/** Creates a unique key for each attempt. */
139+
export function attemptKey(ctx: AttemptKeyMaterial): string {
140+
return `${ctx.run.id}-${ctx.attempt.number}`;
141+
}

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from "../errors.js";
1313
import {
1414
accessoryAttributes,
15+
attemptKey,
1516
flattenAttributes,
1617
lifecycleHooks,
1718
runMetadata,
@@ -235,7 +236,7 @@ export class TaskExecutor {
235236
const [exportError, finalOutput] = await tryCatch(
236237
conditionallyExportPacket(
237238
stringifiedOutput,
238-
`${execution.attempt.id}/output`,
239+
`${attemptKey(ctx)}/output`,
239240
this._tracer
240241
)
241242
);

0 commit comments

Comments
 (0)