Skip to content

Commit 0e82698

Browse files
authored
v4: Waitpoint PAT completion, fix dequeuing without retry config, fixed metadata (#1826)
* WIP * Completing waitpoints using public access tokens * fixed example * adding agent-loops reference, adding useWaitToken react hook, adding a way to throw an error to output from the run, adding an icon option to logger.trace * Fixed metadata system * Check if the waitpoint is COMPLETED already in the complete endpoint and return true
1 parent be02439 commit 0e82698

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+2076
-398
lines changed

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.complete.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,20 @@ import { logger } from "~/services/logger.server";
1313
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
1414
import { engine } from "~/v3/runEngine.server";
1515

16-
const { action } = createActionApiRoute(
16+
const { action, loader } = createActionApiRoute(
1717
{
1818
params: z.object({
1919
waitpointFriendlyId: z.string(),
2020
}),
2121
body: CompleteWaitpointTokenRequestBody,
2222
maxContentLength: env.TASK_PAYLOAD_MAXIMUM_SIZE,
23-
method: "POST",
23+
allowJWT: true,
24+
authorization: {
25+
action: "write",
26+
resource: (params) => ({ waitpoints: params.waitpointFriendlyId }),
27+
superScopes: ["write:waitpoints", "admin"],
28+
},
29+
corsStrategy: "all",
2430
},
2531
async ({ authentication, body, params }) => {
2632
// Resume tokens are actually just waitpoints
@@ -39,6 +45,12 @@ const { action } = createActionApiRoute(
3945
throw json({ error: "Waitpoint not found" }, { status: 404 });
4046
}
4147

48+
if (waitpoint.status === "COMPLETED") {
49+
return json<CompleteWaitpointTokenResponseBody>({
50+
success: true,
51+
});
52+
}
53+
4254
const stringifiedData = await stringifyIO(body.data);
4355
const finalData = await conditionallyExportPacket(
4456
stringifiedData,
@@ -65,4 +77,4 @@ const { action } = createActionApiRoute(
6577
}
6678
);
6779

68-
export { action };
80+
export { action, loader };

apps/webapp/app/routes/api.v1.waitpoints.tokens.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import {
1313
createActionApiRoute,
1414
createLoaderApiRoute,
1515
} from "~/services/routeBuilders/apiBuilder.server";
16+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
1617
import { parseDelay } from "~/utils/delays";
1718
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
1819
import { engine } from "~/v3/runEngine.server";
@@ -77,12 +78,14 @@ const { action } = createActionApiRoute(
7778
tags: bodyTags,
7879
});
7980

81+
const $responseHeaders = await responseHeaders(authentication.environment);
82+
8083
return json<CreateWaitpointTokenResponseBody>(
8184
{
8285
id: WaitpointId.toFriendlyId(result.waitpoint.id),
8386
isCached: result.isCached,
8487
},
85-
{ status: 200 }
88+
{ status: 200, headers: $responseHeaders }
8689
);
8790
} catch (error) {
8891
if (error instanceof ServiceValidationError) {
@@ -96,4 +99,17 @@ const { action } = createActionApiRoute(
9699
}
97100
);
98101

102+
async function responseHeaders(
103+
environment: AuthenticatedEnvironment
104+
): Promise<Record<string, string>> {
105+
const claimsHeader = JSON.stringify({
106+
sub: environment.id,
107+
pub: true,
108+
});
109+
110+
return {
111+
"x-trigger-jwt-claims": claimsHeader,
112+
};
113+
}
114+
99115
export { action };

apps/webapp/app/services/authorization.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed
22

3-
const ResourceTypes = ["tasks", "tags", "runs", "batch"] as const;
3+
const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints"] as const;
44

55
export type AuthorizationResources = {
66
[key in (typeof ResourceTypes)[number]]?: string | string[];

internal-packages/run-engine/src/engine/errors.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@ export function runStatusFromError(error: TaskRunError): TaskRunStatus {
4949
case "TASK_EXECUTION_ABORTED":
5050
case "TASK_EXECUTION_FAILED":
5151
case "TASK_PROCESS_SIGTERM":
52-
case "TASK_DEQUEUED_INVALID_RETRY_CONFIG":
53-
case "TASK_DEQUEUED_NO_RETRY_CONFIG":
5452
case "TASK_DID_CONCURRENT_WAIT":
5553
return "SYSTEM_FAILURE";
5654
default:

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 1 addition & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -307,41 +307,9 @@ export class DequeueSystem {
307307
task: result.task.id,
308308
rawRetryConfig: retryConfig,
309309
});
310-
311-
await this.runAttemptSystem.systemFailure({
312-
runId,
313-
error: {
314-
type: "INTERNAL_ERROR",
315-
code: "TASK_DEQUEUED_INVALID_RETRY_CONFIG",
316-
message: `Invalid retry config: ${retryConfig}`,
317-
},
318-
tx: prisma,
319-
});
320-
321-
return null;
322-
}
323-
324-
if (!parsedConfig.data) {
325-
this.$.logger.error("RunEngine.dequeueFromMasterQueue(): No retry config", {
326-
runId,
327-
task: result.task.id,
328-
rawRetryConfig: retryConfig,
329-
});
330-
331-
await this.runAttemptSystem.systemFailure({
332-
runId,
333-
error: {
334-
type: "INTERNAL_ERROR",
335-
code: "TASK_DEQUEUED_NO_RETRY_CONFIG",
336-
message: `No retry config found`,
337-
},
338-
tx: prisma,
339-
});
340-
341-
return null;
342310
}
343311

344-
maxAttempts = parsedConfig.data.maxAttempts;
312+
maxAttempts = parsedConfig.data?.maxAttempts;
345313
}
346314
//update the run
347315
const lockedTaskRun = await prisma.taskRun.update({

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 36 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { timeoutError } from "@trigger.dev/core/v3";
1+
import { timeoutError, tryCatch } from "@trigger.dev/core/v3";
22
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
33
import {
44
$transaction,
@@ -66,65 +66,51 @@ export class WaitpointSystem {
6666
isError: boolean;
6767
};
6868
}): Promise<Waitpoint> {
69-
const result = await $transaction(
70-
this.$.prisma,
71-
async (tx) => {
72-
// 1. Find the TaskRuns blocked by this waitpoint
73-
const affectedTaskRuns = await tx.taskRunWaitpoint.findMany({
74-
where: { waitpointId: id },
75-
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
76-
});
69+
// 1. Find the TaskRuns blocked by this waitpoint
70+
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
71+
where: { waitpointId: id },
72+
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
73+
});
7774

78-
if (affectedTaskRuns.length === 0) {
79-
this.$.logger.warn(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
80-
waitpointId: id,
81-
});
82-
}
75+
if (affectedTaskRuns.length === 0) {
76+
this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
77+
waitpointId: id,
78+
});
79+
}
8380

84-
// 2. Update the waitpoint to completed (only if it's pending)
85-
let waitpoint: Waitpoint | null = null;
86-
try {
87-
waitpoint = await tx.waitpoint.update({
88-
where: { id, status: "PENDING" },
89-
data: {
90-
status: "COMPLETED",
91-
completedAt: new Date(),
92-
output: output?.value,
93-
outputType: output?.type,
94-
outputIsError: output?.isError,
95-
},
96-
});
97-
} catch (error) {
98-
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2025") {
99-
waitpoint = await tx.waitpoint.findFirst({
100-
where: { id },
101-
});
102-
} else {
103-
this.$.logger.log("completeWaitpoint: error updating waitpoint:", { error });
104-
throw error;
105-
}
106-
}
81+
let [waitpointError, waitpoint] = await tryCatch(
82+
this.$.prisma.waitpoint.update({
83+
where: { id, status: "PENDING" },
84+
data: {
85+
status: "COMPLETED",
86+
completedAt: new Date(),
87+
output: output?.value,
88+
outputType: output?.type,
89+
outputIsError: output?.isError,
90+
},
91+
})
92+
);
10793

108-
return { waitpoint, affectedTaskRuns };
109-
},
110-
(error) => {
111-
this.$.logger.error(`completeWaitpoint: Error completing waitpoint ${id}, retrying`, {
112-
error,
94+
if (waitpointError) {
95+
if (
96+
waitpointError instanceof Prisma.PrismaClientKnownRequestError &&
97+
waitpointError.code === "P2025"
98+
) {
99+
waitpoint = await this.$.prisma.waitpoint.findFirst({
100+
where: { id },
113101
});
114-
throw error;
102+
} else {
103+
this.$.logger.log("completeWaitpoint: error updating waitpoint:", { waitpointError });
104+
throw waitpointError;
115105
}
116-
);
117-
118-
if (!result) {
119-
throw new Error(`Waitpoint couldn't be updated`);
120106
}
121107

122-
if (!result.waitpoint) {
108+
if (!waitpoint) {
123109
throw new Error(`Waitpoint ${id} not found`);
124110
}
125111

126112
//schedule trying to continue the runs
127-
for (const run of result.affectedTaskRuns) {
113+
for (const run of affectedTaskRuns) {
128114
await this.$.worker.enqueue({
129115
//this will debounce the call
130116
id: `continueRunIfUnblocked:${run.taskRunId}`,
@@ -148,7 +134,7 @@ export class WaitpointSystem {
148134
}
149135
}
150136

151-
return result.waitpoint;
137+
return waitpoint;
152138
}
153139

154140
/**

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,8 @@ const zodIpc = new ZodIpcConnection({
378378
return;
379379
}
380380

381+
runMetadataManager.runId = execution.run.id;
382+
381383
const executor = new TaskExecutor(task, {
382384
tracer,
383385
tracingSDK,

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,8 @@ const zodIpc = new ZodIpcConnection({
376376
return;
377377
}
378378

379+
runMetadataManager.runId = execution.run.id;
380+
379381
const executor = new TaskExecutor(task, {
380382
tracer,
381383
tracingSDK,

packages/core/src/v3/apiClient/core.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { calculateNextRetryDelay } from "../utils/retries.js";
55
import { ApiConnectionError, ApiError, ApiSchemaValidationError } from "./errors.js";
66

77
import { Attributes, context, propagation, Span } from "@opentelemetry/api";
8-
import {suppressTracing} from "@opentelemetry/core"
8+
import { suppressTracing } from "@opentelemetry/core";
99
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
1010
import type { TriggerTracer } from "../tracer.js";
1111
import { accessoryAttributes } from "../utils/styleAttributes.js";
@@ -27,14 +27,14 @@ export const defaultRetryOptions = {
2727
randomize: false,
2828
} satisfies RetryOptions;
2929

30-
export type ZodFetchOptions<T = unknown> = {
30+
export type ZodFetchOptions<TData = any> = {
3131
retry?: RetryOptions;
3232
tracer?: TriggerTracer;
3333
name?: string;
3434
attributes?: Attributes;
3535
icon?: string;
36-
onResponseBody?: (body: T, span: Span) => void;
37-
prepareData?: (data: T) => Promise<T> | T;
36+
onResponseBody?: (body: TData, span: Span) => void;
37+
prepareData?: (data: TData, response: Response) => Promise<TData> | TData;
3838
};
3939

4040
export type AnyZodFetchOptions = ZodFetchOptions<any>;
@@ -144,7 +144,14 @@ export function zodfetchOffsetLimitPage<TItemSchema extends z.ZodTypeAny>(
144144

145145
const fetchResult = _doZodFetch(offsetLimitPageSchema, $url.href, requestInit, options);
146146

147-
return new OffsetLimitPagePromise(fetchResult, schema, url, params, requestInit, options);
147+
return new OffsetLimitPagePromise(
148+
fetchResult as Promise<ZodFetchResult<OffsetLimitPageResponse<z.output<TItemSchema>>>>,
149+
schema,
150+
url,
151+
params,
152+
requestInit,
153+
options
154+
);
148155
}
149156

150157
type ZodFetchResult<T> = {
@@ -188,7 +195,7 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
188195
schema: TResponseBodySchema,
189196
url: string,
190197
requestInit?: PromiseOrValue<RequestInit>,
191-
options?: ZodFetchOptions
198+
options?: ZodFetchOptions<z.output<TResponseBodySchema>>
192199
): Promise<ZodFetchResult<z.output<TResponseBodySchema>>> {
193200
let $requestInit = await requestInit;
194201

@@ -202,7 +209,7 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
202209
}
203210

204211
if (options?.prepareData) {
205-
result.data = await options.prepareData(result.data);
212+
result.data = await options.prepareData(result.data, result.response);
206213
}
207214

208215
return result;

0 commit comments

Comments
 (0)