Skip to content

Commit e783be1

Browse files
committed
Run metadata
1 parent d361e24 commit e783be1

File tree

24 files changed

+556
-12
lines changed

24 files changed

+556
-12
lines changed

.changeset/tasty-rats-rhyme.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
Add Run metadata to allow for storing up to 8KB of data on a run and update it during the run

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ const EnvironmentSchema = z.object({
212212
MAXIMUM_TRACE_SUMMARY_VIEW_COUNT: z.coerce.number().int().default(25_000),
213213
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
214214
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
215+
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(8_000), // 8KB
215216
});
216217

217218
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ const commonRunSelect = {
2929
completedAt: true,
3030
expiredAt: true,
3131
delayUntil: true,
32+
metadata: true,
33+
metadataType: true,
3234
ttl: true,
3335
tags: true,
3436
costInCents: true,
@@ -157,10 +159,8 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
157159
}
158160
}
159161

160-
const apiStatus = ApiRetrieveRunPresenter.apiStatusFromRunStatus(taskRun.status);
161-
162162
return {
163-
...createCommonRunStructure(taskRun),
163+
...(await createCommonRunStructure(taskRun)),
164164
payload: $payload,
165165
payloadPresignedUrl: $payloadPresignedUrl,
166166
output: $output,
@@ -191,11 +191,15 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
191191
error: ApiRetrieveRunPresenter.apiErrorFromError(a.error),
192192
})),
193193
relatedRuns: {
194-
root: taskRun.rootTaskRun ? createCommonRunStructure(taskRun.rootTaskRun) : undefined,
194+
root: taskRun.rootTaskRun
195+
? await createCommonRunStructure(taskRun.rootTaskRun)
196+
: undefined,
195197
parent: taskRun.parentTaskRun
196-
? createCommonRunStructure(taskRun.parentTaskRun)
198+
? await createCommonRunStructure(taskRun.parentTaskRun)
197199
: undefined,
198-
children: taskRun.childRuns.map((r) => createCommonRunStructure(r)),
200+
children: await Promise.all(
201+
taskRun.childRuns.map(async (r) => await createCommonRunStructure(r))
202+
),
199203
},
200204
};
201205
});
@@ -329,7 +333,12 @@ export class ApiRetrieveRunPresenter extends BasePresenter {
329333
}
330334
}
331335

332-
function createCommonRunStructure(run: CommonRelatedRun) {
336+
async function createCommonRunStructure(run: CommonRelatedRun) {
337+
const metadata = await parsePacket({
338+
data: run.metadata ?? undefined,
339+
dataType: run.metadataType,
340+
});
341+
333342
return {
334343
id: run.friendlyId,
335344
taskIdentifier: run.taskIdentifier,
@@ -354,6 +363,7 @@ function createCommonRunStructure(run: CommonRelatedRun) {
354363
...ApiRetrieveRunPresenter.apiBooleanHelpersFromTaskRunStatus(run.status),
355364
triggerFunction: resolveTriggerFunction(run),
356365
batchId: run.batch?.friendlyId,
366+
metadata,
357367
};
358368
}
359369

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { MachinePresetName, prettyPrintPacket, TaskRunError } from "@trigger.dev/core/v3";
1+
import {
2+
MachinePresetName,
3+
parsePacket,
4+
prettyPrintPacket,
5+
TaskRunError,
6+
} from "@trigger.dev/core/v3";
27
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
38
import { eventRepository } from "~/v3/eventRepository.server";
49
import { machinePresetFromName } from "~/v3/machinePresets.server";
@@ -113,6 +118,8 @@ export class SpanPresenter extends BasePresenter {
113118
},
114119
payload: true,
115120
payloadType: true,
121+
metadata: true,
122+
metadataType: true,
116123
maxAttempts: true,
117124
project: {
118125
include: {
@@ -185,6 +192,11 @@ export class SpanPresenter extends BasePresenter {
185192

186193
const span = await eventRepository.getSpan(spanId, run.traceId);
187194

195+
const metadata = await parsePacket({
196+
data: run.metadata ?? undefined,
197+
dataType: run.metadataType,
198+
});
199+
188200
const context = {
189201
task: {
190202
id: run.taskIdentifier,
@@ -203,6 +215,7 @@ export class SpanPresenter extends BasePresenter {
203215
baseCostInCents: run.baseCostInCents,
204216
maxAttempts: run.maxAttempts ?? undefined,
205217
version: run.lockedToVersion?.version,
218+
metadata,
206219
},
207220
queue: {
208221
name: run.queue,
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { AddTagsRequestBody, parsePacket, UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
3+
import { z } from "zod";
4+
import { prisma } from "~/db.server";
5+
import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
6+
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { handleMetadataPacket } from "~/utils/packets";
8+
import { ServiceValidationError } from "~/v3/services/baseService.server";
9+
import { FINAL_RUN_STATUSES } from "~/v3/taskStatus";
10+
11+
const ParamsSchema = z.object({
12+
runId: z.string(),
13+
});
14+
15+
export async function action({ request, params }: ActionFunctionArgs) {
16+
// Ensure this is a POST request
17+
if (request.method.toUpperCase() !== "PUT") {
18+
return { status: 405, body: "Method Not Allowed" };
19+
}
20+
21+
// Authenticate the request
22+
const authenticationResult = await authenticateApiRequest(request);
23+
if (!authenticationResult) {
24+
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
25+
}
26+
27+
const parsedParams = ParamsSchema.safeParse(params);
28+
if (!parsedParams.success) {
29+
return json(
30+
{ error: "Invalid request parameters", issues: parsedParams.error.issues },
31+
{ status: 400 }
32+
);
33+
}
34+
35+
try {
36+
const anyBody = await request.json();
37+
38+
const body = UpdateMetadataRequestBody.safeParse(anyBody);
39+
40+
if (!body.success) {
41+
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
42+
}
43+
44+
const metadataPacket = handleMetadataPacket(
45+
body.data.metadata,
46+
body.data.metadataType ?? "application/json"
47+
);
48+
49+
if (!metadataPacket) {
50+
return json({ error: "Invalid metadata" }, { status: 400 });
51+
}
52+
53+
const taskRun = await prisma.taskRun.findFirst({
54+
where: {
55+
friendlyId: parsedParams.data.runId,
56+
runtimeEnvironmentId: authenticationResult.environment.id,
57+
},
58+
select: {
59+
status: true,
60+
},
61+
});
62+
63+
if (!taskRun) {
64+
return json({ error: "Task Run not found" }, { status: 404 });
65+
}
66+
67+
if (FINAL_RUN_STATUSES.includes(taskRun.status)) {
68+
return json({ error: "Cannot update metadata for a completed run" }, { status: 400 });
69+
}
70+
71+
await prisma.taskRun.update({
72+
where: {
73+
friendlyId: parsedParams.data.runId,
74+
runtimeEnvironmentId: authenticationResult.environment.id,
75+
},
76+
data: {
77+
metadata: metadataPacket?.data,
78+
metadataType: metadataPacket?.dataType,
79+
},
80+
});
81+
82+
const parsedPacket = await parsePacket(metadataPacket);
83+
84+
return json({ metadata: parsedPacket }, { status: 200 });
85+
} catch (error) {
86+
if (error instanceof ServiceValidationError) {
87+
return json({ error: error.message }, { status: error.status ?? 422 });
88+
} else {
89+
return json(
90+
{ error: error instanceof Error ? error.message : "Internal Server Error" },
91+
{ status: 500 }
92+
);
93+
}
94+
}
95+
}

apps/webapp/app/routes/resources.runs.$runParam.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import { LoaderFunctionArgs } from "@remix-run/server-runtime";
2-
import { MachinePresetName, prettyPrintPacket, TaskRunError } from "@trigger.dev/core/v3";
2+
import {
3+
MachinePresetName,
4+
parsePacket,
5+
prettyPrintPacket,
6+
TaskRunError,
7+
} from "@trigger.dev/core/v3";
38
import { typedjson, UseDataFunctionReturn } from "remix-typedjson";
49
import { RUNNING_STATUSES } from "~/components/runs/v3/TaskRunStatus";
510
import { $replica } from "~/db.server";
@@ -72,6 +77,8 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
7277
},
7378
payload: true,
7479
payloadType: true,
80+
metadata: true,
81+
metadataType: true,
7582
maxAttempts: true,
7683
project: {
7784
include: {
@@ -151,6 +158,11 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
151158
}
152159
}
153160

161+
const metadata = await parsePacket({
162+
data: run.metadata ?? undefined,
163+
dataType: run.metadataType,
164+
});
165+
154166
const context = {
155167
task: {
156168
id: run.taskIdentifier,
@@ -169,6 +181,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
169181
baseCostInCents: run.baseCostInCents,
170182
maxAttempts: run.maxAttempts ?? undefined,
171183
version: run.lockedToVersion?.version,
184+
metadata,
172185
},
173186
queue: {
174187
name: run.queue,

apps/webapp/app/utils/packets.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import { IOPacket } from "@trigger.dev/core/v3/utils/ioSerialization";
2+
import { env } from "~/env.server";
3+
import { ServiceValidationError } from "~/v3/services/baseService.server";
4+
5+
export class MetadataTooLargeError extends ServiceValidationError {
6+
constructor(message: string) {
7+
super(message, 413);
8+
this.name = "MetadataTooLargeError";
9+
}
10+
}
11+
12+
export function handleMetadataPacket(metadata: any, metadataType: string): IOPacket | undefined {
13+
let metadataPacket: IOPacket | undefined = undefined;
14+
15+
if (typeof metadata === "string") {
16+
metadataPacket = { data: metadata, dataType: metadataType };
17+
}
18+
19+
if (metadataType === "application/json") {
20+
metadataPacket = { data: JSON.stringify(metadata), dataType: "application/json" };
21+
}
22+
23+
if (!metadataPacket || !metadataPacket.data) {
24+
return;
25+
}
26+
27+
const byteLength = Buffer.byteLength(metadataPacket.data, "utf8");
28+
29+
if (byteLength > env.TASK_RUN_METADATA_MAXIMUM_SIZE) {
30+
throw new MetadataTooLargeError(
31+
`Metadata exceeds maximum size of ${env.TASK_RUN_METADATA_MAXIMUM_SIZE} bytes`
32+
);
33+
}
34+
35+
return metadataPacket;
36+
}

apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
TaskRunExecutionResult,
1010
TaskRunFailedExecutionResult,
1111
TaskRunSuccessfulExecutionResult,
12+
parsePacket,
1213
serverWebsocketMessages,
1314
} from "@trigger.dev/core/v3";
1415
import { ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler";
@@ -1033,6 +1034,11 @@ class SharedQueueTasks {
10331034

10341035
const machinePreset = machinePresetFromConfig(backgroundWorkerTask.machineConfig ?? {});
10351036

1037+
const metadata = await parsePacket({
1038+
data: taskRun.metadata ?? undefined,
1039+
dataType: taskRun.metadataType,
1040+
});
1041+
10361042
const execution: ProdTaskRunExecution = {
10371043
task: {
10381044
id: backgroundWorkerTask.slug,
@@ -1060,6 +1066,7 @@ class SharedQueueTasks {
10601066
durationMs: taskRun.usageDurationMs,
10611067
costInCents: taskRun.costInCents,
10621068
baseCostInCents: taskRun.baseCostInCents,
1069+
metadata,
10631070
},
10641071
queue: {
10651072
id: queue.friendlyId,

apps/webapp/app/v3/services/createTaskRunAttempt.server.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { TaskRunExecution } from "@trigger.dev/core/v3";
1+
import { parsePacket, TaskRunExecution } from "@trigger.dev/core/v3";
22
import { $transaction, PrismaClientOrTransaction, prisma } from "~/db.server";
33
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { logger } from "~/services/logger.server";
@@ -157,6 +157,11 @@ export class CreateTaskRunAttemptService extends BaseService {
157157

158158
const machinePreset = machinePresetFromConfig(taskRun.lockedBy.machineConfig ?? {});
159159

160+
const metadata = await parsePacket({
161+
data: taskRun.metadata ?? undefined,
162+
dataType: taskRun.metadataType,
163+
});
164+
160165
const execution: TaskRunExecution = {
161166
task: {
162167
id: taskRun.lockedBy.slug,
@@ -186,6 +191,7 @@ export class CreateTaskRunAttemptService extends BaseService {
186191
baseCostInCents: taskRun.baseCostInCents,
187192
maxAttempts: taskRun.maxAttempts ?? undefined,
188193
version: taskRun.lockedBy.worker.version,
194+
metadata,
189195
},
190196
queue: {
191197
id: queue.friendlyId,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { logger } from "~/services/logger.server";
2020
import { isFinalAttemptStatus, isFinalRunStatus } from "../taskStatus";
2121
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2222
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
23+
import { handleMetadataPacket } from "~/utils/packets";
2324

2425
export type TriggerTaskServiceOptions = {
2526
idempotencyKey?: string;
@@ -99,6 +100,13 @@ export class TriggerTaskService extends BaseService {
99100
environment
100101
);
101102

103+
const metadataPacket = body.options?.metadata
104+
? handleMetadataPacket(
105+
body.options?.metadata,
106+
body.options?.metadataType ?? "application/json"
107+
)
108+
: undefined;
109+
102110
const dependentAttempt = body.options?.dependentAttempt
103111
? await this._prisma.taskRunAttempt.findUnique({
104112
where: { friendlyId: body.options.dependentAttempt },
@@ -341,6 +349,8 @@ export class TriggerTaskService extends BaseService {
341349
batchId: dependentBatchRun?.id ?? parentBatchRun?.id,
342350
resumeParentOnCompletion: !!(dependentAttempt ?? dependentBatchRun),
343351
depth,
352+
metadata: metadataPacket?.data,
353+
metadataType: metadataPacket?.dataType,
344354
},
345355
});
346356

0 commit comments

Comments
 (0)