Skip to content

Commit e3d99a5

Browse files
committed
WIP parent metadata updates
1 parent 53d4d81 commit e3d99a5

File tree

14 files changed

+280
-106
lines changed

14 files changed

+280
-106
lines changed
Lines changed: 19 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,94 +1,29 @@
1-
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2-
import { parsePacket, UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
1+
import { json } from "@remix-run/server-runtime";
2+
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
33
import { z } from "zod";
4-
import { prisma } from "~/db.server";
5-
import { authenticateApiRequest } from "~/services/apiAuth.server";
6-
import { handleMetadataPacket } from "~/utils/packets";
7-
import { ServiceValidationError } from "~/v3/services/baseService.server";
8-
import { isFinalRunStatus } from "~/v3/taskStatus";
4+
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
5+
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
96

107
const ParamsSchema = z.object({
118
runId: z.string(),
129
});
1310

14-
export async function action({ request, params }: ActionFunctionArgs) {
15-
// Ensure this is a PUT request
16-
if (request.method.toUpperCase() !== "PUT") {
17-
return json({ error: "Method not allowed" }, { status: 405, headers: { Allow: "PUT" } });
18-
}
19-
20-
// Authenticate the request
21-
const authenticationResult = await authenticateApiRequest(request);
22-
if (!authenticationResult) {
23-
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
24-
}
25-
26-
const parsedParams = ParamsSchema.safeParse(params);
27-
if (!parsedParams.success) {
28-
return json(
29-
{ error: "Invalid request parameters", issues: parsedParams.error.issues },
30-
{ status: 400 }
31-
);
32-
}
33-
34-
try {
35-
const anyBody = await request.json();
36-
37-
const body = UpdateMetadataRequestBody.safeParse(anyBody);
38-
39-
if (!body.success) {
40-
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
41-
}
42-
43-
const metadataPacket = handleMetadataPacket(
44-
body.data.metadata,
45-
body.data.metadataType ?? "application/json"
46-
);
47-
48-
if (!metadataPacket) {
49-
return json({ error: "Invalid metadata" }, { status: 400 });
50-
}
51-
52-
const taskRun = await prisma.taskRun.findFirst({
53-
where: {
54-
friendlyId: parsedParams.data.runId,
55-
runtimeEnvironmentId: authenticationResult.environment.id,
56-
},
57-
select: {
58-
status: true,
59-
},
60-
});
61-
62-
if (!taskRun) {
11+
const { action } = createActionApiRoute(
12+
{
13+
params: ParamsSchema,
14+
body: UpdateMetadataRequestBody,
15+
maxContentLength: 1024 * 1024, // 1MB
16+
method: "PUT",
17+
},
18+
async ({ authentication, body, params }) => {
19+
const result = await updateMetadataService.call(authentication.environment, params.runId, body);
20+
21+
if (!result) {
6322
return json({ error: "Task Run not found" }, { status: 404 });
6423
}
6524

66-
if (isFinalRunStatus(taskRun.status)) {
67-
return json({ error: "Cannot update metadata for a completed run" }, { status: 400 });
68-
}
69-
70-
await prisma.taskRun.update({
71-
where: {
72-
friendlyId: parsedParams.data.runId,
73-
runtimeEnvironmentId: authenticationResult.environment.id,
74-
},
75-
data: {
76-
metadata: metadataPacket?.data,
77-
metadataType: metadataPacket?.dataType,
78-
},
79-
});
80-
81-
const parsedPacket = await parsePacket(metadataPacket);
82-
83-
return json({ metadata: parsedPacket }, { status: 200 });
84-
} catch (error) {
85-
if (error instanceof ServiceValidationError) {
86-
return json({ error: error.message }, { status: error.status ?? 422 });
87-
} else {
88-
return json(
89-
{ error: error instanceof Error ? error.message : "Internal Server Error" },
90-
{ status: 500 }
91-
);
92-
}
25+
return json(result, { status: 200 });
9326
}
94-
}
27+
);
28+
29+
export { action };

apps/webapp/app/routes/api.v1.tasks.batch.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,21 @@
11
import { json } from "@remix-run/server-runtime";
22
import {
3-
BatchTriggerTaskResponse,
43
BatchTriggerTaskV2RequestBody,
54
BatchTriggerTaskV2Response,
65
generateJWT,
76
} from "@trigger.dev/core/v3";
87
import { env } from "~/env.server";
8+
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
9+
import { logger } from "~/services/logger.server";
910
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
10-
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
1111
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
12+
import { ServiceValidationError } from "~/v3/services/baseService.server";
1213
import {
1314
BatchProcessingStrategy,
1415
BatchTriggerV2Service,
1516
} from "~/v3/services/batchTriggerV2.server";
16-
import { ServiceValidationError } from "~/v3/services/baseService.server";
1717
import { OutOfEntitlementError } from "~/v3/services/triggerTask.server";
18-
import { AuthenticatedEnvironment, getOneTimeUseToken } from "~/services/apiAuth.server";
19-
import { logger } from "~/services/logger.server";
20-
import { z } from "zod";
18+
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
2119

2220
const { action, loader } = createActionApiRoute(
2321
{
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import { parsePacket, UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
2+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
3+
import { handleMetadataPacket } from "~/utils/packets";
4+
import { BaseService, ServiceValidationError } from "~/v3/services/baseService.server";
5+
import { isFinalRunStatus } from "~/v3/taskStatus";
6+
7+
export class UpdateMetadataService extends BaseService {
8+
public async call(
9+
environment: AuthenticatedEnvironment,
10+
runId: string,
11+
body: UpdateMetadataRequestBody
12+
) {
13+
const metadataPacket = handleMetadataPacket(
14+
body.metadata,
15+
body.metadataType ?? "application/json"
16+
);
17+
18+
if (!metadataPacket) {
19+
throw new ServiceValidationError("Invalid metadata");
20+
}
21+
22+
const taskRun = await this._prisma.taskRun.findFirst({
23+
where: {
24+
friendlyId: runId,
25+
runtimeEnvironmentId: environment.id,
26+
},
27+
select: {
28+
id: true,
29+
status: true,
30+
parentTaskRun: {
31+
select: {
32+
id: true,
33+
status: true,
34+
},
35+
},
36+
rootTaskRun: {
37+
select: {
38+
id: true,
39+
status: true,
40+
},
41+
},
42+
},
43+
});
44+
45+
if (!taskRun) {
46+
return;
47+
}
48+
49+
if (isFinalRunStatus(taskRun.status)) {
50+
throw new ServiceValidationError("Cannot update metadata for a completed run");
51+
}
52+
53+
await this._prisma.taskRun.update({
54+
where: {
55+
id: taskRun.id,
56+
},
57+
data: {
58+
metadata: metadataPacket?.data,
59+
metadataType: metadataPacket?.dataType,
60+
},
61+
});
62+
63+
const newMetadata = await parsePacket(metadataPacket);
64+
65+
return {
66+
metadata: newMetadata,
67+
};
68+
}
69+
}
70+
71+
export const updateMetadataService = new UpdateMetadataService();

apps/webapp/app/services/routeBuilders/apiBuilder.server.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,7 @@ type ApiKeyActionRouteBuilderOptions<
384384
headers?: THeadersSchema;
385385
allowJWT?: boolean;
386386
corsStrategy?: "all" | "none";
387+
method?: "POST" | "PUT" | "DELETE" | "PATCH";
387388
authorization?: {
388389
action: AuthorizationAction;
389390
resource: (
@@ -455,6 +456,19 @@ export function createActionApiRoute<
455456
}
456457

457458
async function action({ request, params }: ActionFunctionArgs) {
459+
if (options.method) {
460+
if (request.method.toUpperCase() !== options.method) {
461+
return await wrapResponse(
462+
request,
463+
json(
464+
{ error: "Method not allowed" },
465+
{ status: 405, headers: { Allow: options.method } }
466+
),
467+
corsStrategy !== "none"
468+
);
469+
}
470+
}
471+
458472
try {
459473
const authenticationResult = await authenticateApiRequestWithFailure(request, { allowJWT });
460474

packages/core/src/v3/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export * from "./task-context-api.js";
1212
export * from "./apiClientManager-api.js";
1313
export * from "./usage-api.js";
1414
export * from "./run-metadata-api.js";
15+
export * from "./runMetadata/types.js";
1516
export * from "./wait-until-api.js";
1617
export * from "./timeout-api.js";
1718
export * from "./schemas/index.js";

packages/core/src/v3/runMetadata/index.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { DeserializedJson } from "../../schemas/json.js";
22
import { getGlobal, registerGlobal } from "../utils/globals.js";
33
import { ApiRequestOptions } from "../zodfetch.js";
44
import { NoopRunMetadataManager } from "./noopManager.js";
5-
import { RunMetadataManager } from "./types.js";
5+
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
66

77
const API_NAME = "run-metadata";
88

@@ -80,4 +80,12 @@ export class RunMetadataAPI implements RunMetadataManager {
8080
flush(requestOptions?: ApiRequestOptions): Promise<void> {
8181
return this.#getManager().flush(requestOptions);
8282
}
83+
84+
get parent(): RunMetadataUpdater {
85+
return this.#getManager().parent;
86+
}
87+
88+
get root(): RunMetadataUpdater {
89+
return this.#getManager().root;
90+
}
8391
}

packages/core/src/v3/runMetadata/manager.ts

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@ import { JSONHeroPath } from "@jsonhero/path";
22
import { dequal } from "dequal/lite";
33
import { DeserializedJson } from "../../schemas/json.js";
44
import { ApiRequestOptions } from "../zodfetch.js";
5-
import { RunMetadataManager } from "./types.js";
5+
import { RunMetadataManager, RunMetadataUpdater } from "./types.js";
66
import { MetadataStream } from "./metadataStream.js";
77
import { ApiClient } from "../apiClient/index.js";
8+
import { RunMetadataChangeOperation } from "../schemas/api.js";
89

910
const MAXIMUM_ACTIVE_STREAMS = 5;
1011
const MAXIMUM_TOTAL_STREAMS = 10;
@@ -16,6 +17,9 @@ export class StandardMetadataManager implements RunMetadataManager {
1617
// Add a Map to track active streams
1718
private activeStreams = new Map<string, MetadataStream<any>>();
1819

20+
private queuedParentOperations: Set<RunMetadataChangeOperation> = new Set();
21+
private queuedRootOperations: Set<RunMetadataChangeOperation> = new Set();
22+
1923
public runId: string | undefined;
2024

2125
constructor(
@@ -24,6 +28,52 @@ export class StandardMetadataManager implements RunMetadataManager {
2428
private streamsVersion: "v1" | "v2" = "v1"
2529
) {}
2630

31+
get parent(): RunMetadataUpdater {
32+
return {
33+
setKey: (key, value) => {
34+
this.queuedParentOperations.add({ type: "set", key, value });
35+
},
36+
deleteKey: (key) => {
37+
this.queuedParentOperations.add({ type: "delete", key });
38+
},
39+
appendKey: (key, value) => {
40+
this.queuedParentOperations.add({ type: "append", key, value });
41+
},
42+
removeFromKey: (key, value) => {
43+
this.queuedParentOperations.add({ type: "remove", key, value });
44+
},
45+
incrementKey: (key, value) => {
46+
this.queuedParentOperations.add({ type: "increment", key, value });
47+
},
48+
decrementKey: (key, value) => {
49+
this.queuedParentOperations.add({ type: "decrement", key, value });
50+
},
51+
};
52+
}
53+
54+
get root(): RunMetadataUpdater {
55+
return {
56+
setKey: (key, value) => {
57+
this.queuedRootOperations.add({ type: "set", key, value });
58+
},
59+
deleteKey: (key) => {
60+
this.queuedRootOperations.add({ type: "delete", key });
61+
},
62+
appendKey: (key, value) => {
63+
this.queuedRootOperations.add({ type: "append", key, value });
64+
},
65+
removeFromKey: (key, value) => {
66+
this.queuedRootOperations.add({ type: "remove", key, value });
67+
},
68+
incrementKey: (key, value) => {
69+
this.queuedRootOperations.add({ type: "increment", key, value });
70+
},
71+
decrementKey: (key, value) => {
72+
this.queuedRootOperations.add({ type: "decrement", key, value });
73+
},
74+
};
75+
}
76+
2777
public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
2878
this.store = metadata ?? {};
2979
}
@@ -294,17 +344,24 @@ export class StandardMetadataManager implements RunMetadataManager {
294344
return;
295345
}
296346

297-
if (!this.store) {
298-
return;
299-
}
300-
301-
if (!this.hasChanges) {
347+
if (!this.#needsFlush()) {
302348
return;
303349
}
304350

305351
try {
306352
this.hasChanges = false;
307-
await this.apiClient.updateRunMetadata(this.runId, { metadata: this.store }, requestOptions);
353+
354+
const parentOperations = Array.from(this.queuedParentOperations);
355+
this.queuedParentOperations.clear();
356+
357+
const rootOperations = Array.from(this.queuedRootOperations);
358+
this.queuedRootOperations.clear();
359+
360+
await this.apiClient.updateRunMetadata(
361+
this.runId,
362+
{ metadata: this.store ?? {}, parentOperations, rootOperations },
363+
requestOptions
364+
);
308365
} catch (error) {
309366
this.hasChanges = true;
310367
throw error;
@@ -336,4 +393,10 @@ export class StandardMetadataManager implements RunMetadataManager {
336393
this.flushTimeoutId = null;
337394
}
338395
}
396+
397+
#needsFlush(): boolean {
398+
return (
399+
this.hasChanges || this.queuedParentOperations.size > 0 || this.queuedRootOperations.size > 0
400+
);
401+
}
339402
}

packages/core/src/v3/runMetadata/metadataSync.ts

Whitespace-only changes.

0 commit comments

Comments
 (0)