Skip to content

Commit 6026810

Browse files
committed
cut resume payload queries in half
1 parent 7c6af13 commit 6026810

File tree

2 files changed

+217
-109
lines changed

2 files changed

+217
-109
lines changed

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 212 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import { ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler";
1717
import {
1818
BackgroundWorker,
1919
BackgroundWorkerTask,
20+
Prisma,
2021
RuntimeEnvironment,
2122
TaskRun,
2223
TaskRunStatus,
@@ -998,7 +999,157 @@ export class SharedQueueConsumer {
998999
}
9991000
}
10001001

1002+
type AttemptForCompletion = Prisma.TaskRunAttemptGetPayload<{
1003+
include: {
1004+
backgroundWorker: true;
1005+
backgroundWorkerTask: true;
1006+
taskRun: {
1007+
include: {
1008+
runtimeEnvironment: {
1009+
include: {
1010+
organization: true;
1011+
project: true;
1012+
};
1013+
};
1014+
tags: true;
1015+
};
1016+
};
1017+
queue: true;
1018+
};
1019+
}>;
1020+
1021+
type AttemptForExecution = Prisma.TaskRunAttemptGetPayload<{
1022+
include: {
1023+
backgroundWorker: true;
1024+
backgroundWorkerTask: true;
1025+
runtimeEnvironment: {
1026+
include: {
1027+
organization: true;
1028+
project: true;
1029+
};
1030+
};
1031+
taskRun: {
1032+
include: {
1033+
tags: true;
1034+
batchItems: {
1035+
include: {
1036+
batchTaskRun: {
1037+
select: {
1038+
friendlyId: true;
1039+
};
1040+
};
1041+
};
1042+
};
1043+
};
1044+
};
1045+
queue: true;
1046+
};
1047+
}>;
1048+
10011049
class SharedQueueTasks {
1050+
private _completionPayloadFromAttempt(attempt: AttemptForCompletion): TaskRunExecutionResult {
1051+
const ok = attempt.status === "COMPLETED";
1052+
1053+
if (ok) {
1054+
const success: TaskRunSuccessfulExecutionResult = {
1055+
ok,
1056+
id: attempt.taskRun.friendlyId,
1057+
output: attempt.output ?? undefined,
1058+
outputType: attempt.outputType,
1059+
taskIdentifier: attempt.taskRun.taskIdentifier,
1060+
};
1061+
return success;
1062+
} else {
1063+
const failure: TaskRunFailedExecutionResult = {
1064+
ok,
1065+
id: attempt.taskRun.friendlyId,
1066+
error: attempt.error as TaskRunError,
1067+
taskIdentifier: attempt.taskRun.taskIdentifier,
1068+
};
1069+
return failure;
1070+
}
1071+
}
1072+
1073+
private async _executionFromAttempt(
1074+
attempt: AttemptForExecution,
1075+
machinePreset?: MachinePreset
1076+
): Promise<ProdTaskRunExecution> {
1077+
const { backgroundWorkerTask, taskRun, queue } = attempt;
1078+
1079+
if (!machinePreset) {
1080+
machinePreset = machinePresetFromConfig(backgroundWorkerTask.machineConfig ?? {});
1081+
}
1082+
1083+
const metadata = await parsePacket({
1084+
data: taskRun.metadata ?? undefined,
1085+
dataType: taskRun.metadataType,
1086+
});
1087+
1088+
const execution: ProdTaskRunExecution = {
1089+
task: {
1090+
id: backgroundWorkerTask.slug,
1091+
filePath: backgroundWorkerTask.filePath,
1092+
exportName: backgroundWorkerTask.exportName,
1093+
},
1094+
attempt: {
1095+
id: attempt.friendlyId,
1096+
number: attempt.number,
1097+
startedAt: attempt.startedAt ?? attempt.createdAt,
1098+
backgroundWorkerId: attempt.backgroundWorkerId,
1099+
backgroundWorkerTaskId: attempt.backgroundWorkerTaskId,
1100+
status: "EXECUTING" as const,
1101+
},
1102+
run: {
1103+
id: taskRun.friendlyId,
1104+
payload: taskRun.payload,
1105+
payloadType: taskRun.payloadType,
1106+
context: taskRun.context,
1107+
createdAt: taskRun.createdAt,
1108+
startedAt: taskRun.startedAt ?? taskRun.createdAt,
1109+
tags: taskRun.tags.map((tag) => tag.name),
1110+
isTest: taskRun.isTest,
1111+
idempotencyKey: taskRun.idempotencyKey ?? undefined,
1112+
durationMs: taskRun.usageDurationMs,
1113+
costInCents: taskRun.costInCents,
1114+
baseCostInCents: taskRun.baseCostInCents,
1115+
metadata,
1116+
maxDuration: taskRun.maxDurationInSeconds ?? undefined,
1117+
},
1118+
queue: {
1119+
id: queue.friendlyId,
1120+
name: queue.name,
1121+
},
1122+
environment: {
1123+
id: attempt.runtimeEnvironment.id,
1124+
slug: attempt.runtimeEnvironment.slug,
1125+
type: attempt.runtimeEnvironment.type,
1126+
},
1127+
organization: {
1128+
id: attempt.runtimeEnvironment.organization.id,
1129+
slug: attempt.runtimeEnvironment.organization.slug,
1130+
name: attempt.runtimeEnvironment.organization.title,
1131+
},
1132+
project: {
1133+
id: attempt.runtimeEnvironment.project.id,
1134+
ref: attempt.runtimeEnvironment.project.externalRef,
1135+
slug: attempt.runtimeEnvironment.project.slug,
1136+
name: attempt.runtimeEnvironment.project.name,
1137+
},
1138+
batch:
1139+
taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun
1140+
? { id: taskRun.batchItems[0].batchTaskRun.friendlyId }
1141+
: undefined,
1142+
worker: {
1143+
id: attempt.backgroundWorkerId,
1144+
contentHash: attempt.backgroundWorker.contentHash,
1145+
version: attempt.backgroundWorker.version,
1146+
},
1147+
machine: machinePreset,
1148+
};
1149+
1150+
return execution;
1151+
}
1152+
10021153
async getCompletionPayloadFromAttempt(id: string): Promise<TaskRunExecutionResult | undefined> {
10031154
const attempt = await prisma.taskRunAttempt.findFirst({
10041155
where: {
@@ -1030,26 +1181,7 @@ class SharedQueueTasks {
10301181
return;
10311182
}
10321183

1033-
const ok = attempt.status === "COMPLETED";
1034-
1035-
if (ok) {
1036-
const success: TaskRunSuccessfulExecutionResult = {
1037-
ok,
1038-
id: attempt.taskRun.friendlyId,
1039-
output: attempt.output ?? undefined,
1040-
outputType: attempt.outputType,
1041-
taskIdentifier: attempt.taskRun.taskIdentifier,
1042-
};
1043-
return success;
1044-
} else {
1045-
const failure: TaskRunFailedExecutionResult = {
1046-
ok,
1047-
id: attempt.taskRun.friendlyId,
1048-
error: attempt.error as TaskRunError,
1049-
taskIdentifier: attempt.taskRun.taskIdentifier,
1050-
};
1051-
return failure;
1052-
}
1184+
return this._completionPayloadFromAttempt(attempt);
10531185
}
10541186

10551187
async getExecutionPayloadFromAttempt({
@@ -1162,78 +1294,10 @@ class SharedQueueTasks {
11621294
},
11631295
});
11641296
}
1165-
1166-
const { backgroundWorkerTask, taskRun, queue } = attempt;
1297+
const { backgroundWorkerTask, taskRun } = attempt;
11671298

11681299
const machinePreset = machinePresetFromConfig(backgroundWorkerTask.machineConfig ?? {});
1169-
1170-
const metadata = await parsePacket({
1171-
data: taskRun.metadata ?? undefined,
1172-
dataType: taskRun.metadataType,
1173-
});
1174-
1175-
const execution: ProdTaskRunExecution = {
1176-
task: {
1177-
id: backgroundWorkerTask.slug,
1178-
filePath: backgroundWorkerTask.filePath,
1179-
exportName: backgroundWorkerTask.exportName,
1180-
},
1181-
attempt: {
1182-
id: attempt.friendlyId,
1183-
number: attempt.number,
1184-
startedAt: attempt.startedAt ?? attempt.createdAt,
1185-
backgroundWorkerId: attempt.backgroundWorkerId,
1186-
backgroundWorkerTaskId: attempt.backgroundWorkerTaskId,
1187-
status: "EXECUTING" as const,
1188-
},
1189-
run: {
1190-
id: taskRun.friendlyId,
1191-
payload: taskRun.payload,
1192-
payloadType: taskRun.payloadType,
1193-
context: taskRun.context,
1194-
createdAt: taskRun.createdAt,
1195-
startedAt: taskRun.startedAt ?? taskRun.createdAt,
1196-
tags: taskRun.tags.map((tag) => tag.name),
1197-
isTest: taskRun.isTest,
1198-
idempotencyKey: taskRun.idempotencyKey ?? undefined,
1199-
durationMs: taskRun.usageDurationMs,
1200-
costInCents: taskRun.costInCents,
1201-
baseCostInCents: taskRun.baseCostInCents,
1202-
metadata,
1203-
maxDuration: taskRun.maxDurationInSeconds ?? undefined,
1204-
},
1205-
queue: {
1206-
id: queue.friendlyId,
1207-
name: queue.name,
1208-
},
1209-
environment: {
1210-
id: attempt.runtimeEnvironment.id,
1211-
slug: attempt.runtimeEnvironment.slug,
1212-
type: attempt.runtimeEnvironment.type,
1213-
},
1214-
organization: {
1215-
id: attempt.runtimeEnvironment.organization.id,
1216-
slug: attempt.runtimeEnvironment.organization.slug,
1217-
name: attempt.runtimeEnvironment.organization.title,
1218-
},
1219-
project: {
1220-
id: attempt.runtimeEnvironment.project.id,
1221-
ref: attempt.runtimeEnvironment.project.externalRef,
1222-
slug: attempt.runtimeEnvironment.project.slug,
1223-
name: attempt.runtimeEnvironment.project.name,
1224-
},
1225-
batch:
1226-
taskRun.batchItems[0] && taskRun.batchItems[0].batchTaskRun
1227-
? { id: taskRun.batchItems[0].batchTaskRun.friendlyId }
1228-
: undefined,
1229-
worker: {
1230-
id: attempt.backgroundWorkerId,
1231-
contentHash: attempt.backgroundWorker.contentHash,
1232-
version: attempt.backgroundWorker.version,
1233-
},
1234-
machine: machinePreset,
1235-
};
1236-
1300+
const execution = await this._executionFromAttempt(attempt, machinePreset);
12371301
const variables = await this.#buildEnvironmentVariables(
12381302
attempt.runtimeEnvironment,
12391303
taskRun.id,
@@ -1252,6 +1316,64 @@ class SharedQueueTasks {
12521316
return payload;
12531317
}
12541318

1319+
async getResumePayload(attemptId: string): Promise<
1320+
| {
1321+
execution: ProdTaskRunExecution;
1322+
completion: TaskRunExecutionResult;
1323+
}
1324+
| undefined
1325+
> {
1326+
const attempt = await prisma.taskRunAttempt.findFirst({
1327+
where: {
1328+
id: attemptId,
1329+
},
1330+
include: {
1331+
backgroundWorker: true,
1332+
backgroundWorkerTask: true,
1333+
runtimeEnvironment: {
1334+
include: {
1335+
organization: true,
1336+
project: true,
1337+
},
1338+
},
1339+
taskRun: {
1340+
include: {
1341+
runtimeEnvironment: {
1342+
include: {
1343+
organization: true,
1344+
project: true,
1345+
},
1346+
},
1347+
tags: true,
1348+
batchItems: {
1349+
include: {
1350+
batchTaskRun: {
1351+
select: {
1352+
friendlyId: true,
1353+
},
1354+
},
1355+
},
1356+
},
1357+
},
1358+
},
1359+
queue: true,
1360+
},
1361+
});
1362+
1363+
if (!attempt) {
1364+
logger.error("getExecutionPayloadFromAttempt: No attempt found", { id: attemptId });
1365+
return;
1366+
}
1367+
1368+
const execution = await this._executionFromAttempt(attempt);
1369+
const completion = this._completionPayloadFromAttempt(attempt);
1370+
1371+
return {
1372+
execution,
1373+
completion,
1374+
};
1375+
}
1376+
12551377
async getLatestExecutionPayloadFromRun(
12561378
id: string,
12571379
setToExecuting?: boolean,

apps/webapp/app/v3/services/resumeAttempt.server.ts

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -182,30 +182,16 @@ export class ResumeAttemptService extends BaseService {
182182
completedRunId: completedAttempt.taskRunId,
183183
});
184184

185-
const completion = await sharedQueueTasks.getCompletionPayloadFromAttempt(
186-
completedAttempt.id
187-
);
185+
const resumePayload = await sharedQueueTasks.getResumePayload(completedAttempt.id);
188186

189-
if (!completion) {
190-
logger.error("Failed to get completion payload");
187+
if (!resumePayload) {
188+
logger.error("Failed to get resume payload");
191189
await marqs?.acknowledgeMessage(attempt.taskRunId);
192190
return;
193191
}
194192

195-
completions.push(completion);
196-
197-
const executionPayload = await sharedQueueTasks.getExecutionPayloadFromAttempt({
198-
id: completedAttempt.id,
199-
skipStatusChecks: true, // already checked when getting the completion
200-
});
201-
202-
if (!executionPayload) {
203-
logger.error("Failed to get execution payload");
204-
await marqs?.acknowledgeMessage(attempt.taskRunId);
205-
return;
206-
}
207-
208-
executions.push(executionPayload.execution);
193+
completions.push(resumePayload.completion);
194+
executions.push(resumePayload.execution);
209195
}
210196

211197
await this.#setPostResumeStatuses(attempt);

0 commit comments

Comments
 (0)