Skip to content

Commit 2eb492f

Browse files
committed
Completing waitpoints using public access tokens
1 parent 6ca8cb7 commit 2eb492f

File tree

6 files changed

+79
-59
lines changed

6 files changed

+79
-59
lines changed

apps/webapp/app/routes/api.v1.waitpoints.tokens.$waitpointFriendlyId.complete.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,12 @@ const { action, loader } = createActionApiRoute(
4545
throw json({ error: "Waitpoint not found" }, { status: 404 });
4646
}
4747

48+
// if (waitpoint.status === "COMPLETED") {
49+
// return json<CompleteWaitpointTokenResponseBody>({
50+
// success: true,
51+
// });
52+
// }
53+
4854
const stringifiedData = await stringifyIO(body.data);
4955
const finalData = await conditionallyExportPacket(
5056
stringifiedData,

internal-packages/run-engine/src/engine/systems/waitpointSystem.ts

Lines changed: 36 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { timeoutError } from "@trigger.dev/core/v3";
1+
import { timeoutError, tryCatch } from "@trigger.dev/core/v3";
22
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
33
import {
44
$transaction,
@@ -66,65 +66,51 @@ export class WaitpointSystem {
6666
isError: boolean;
6767
};
6868
}): Promise<Waitpoint> {
69-
const result = await $transaction(
70-
this.$.prisma,
71-
async (tx) => {
72-
// 1. Find the TaskRuns blocked by this waitpoint
73-
const affectedTaskRuns = await tx.taskRunWaitpoint.findMany({
74-
where: { waitpointId: id },
75-
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
76-
});
69+
// 1. Find the TaskRuns blocked by this waitpoint
70+
const affectedTaskRuns = await this.$.prisma.taskRunWaitpoint.findMany({
71+
where: { waitpointId: id },
72+
select: { taskRunId: true, spanIdToComplete: true, createdAt: true },
73+
});
7774

78-
if (affectedTaskRuns.length === 0) {
79-
this.$.logger.warn(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
80-
waitpointId: id,
81-
});
82-
}
75+
if (affectedTaskRuns.length === 0) {
76+
this.$.logger.debug(`completeWaitpoint: No TaskRunWaitpoints found for waitpoint`, {
77+
waitpointId: id,
78+
});
79+
}
8380

84-
// 2. Update the waitpoint to completed (only if it's pending)
85-
let waitpoint: Waitpoint | null = null;
86-
try {
87-
waitpoint = await tx.waitpoint.update({
88-
where: { id, status: "PENDING" },
89-
data: {
90-
status: "COMPLETED",
91-
completedAt: new Date(),
92-
output: output?.value,
93-
outputType: output?.type,
94-
outputIsError: output?.isError,
95-
},
96-
});
97-
} catch (error) {
98-
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2025") {
99-
waitpoint = await tx.waitpoint.findFirst({
100-
where: { id },
101-
});
102-
} else {
103-
this.$.logger.log("completeWaitpoint: error updating waitpoint:", { error });
104-
throw error;
105-
}
106-
}
81+
let [waitpointError, waitpoint] = await tryCatch(
82+
this.$.prisma.waitpoint.update({
83+
where: { id, status: "PENDING" },
84+
data: {
85+
status: "COMPLETED",
86+
completedAt: new Date(),
87+
output: output?.value,
88+
outputType: output?.type,
89+
outputIsError: output?.isError,
90+
},
91+
})
92+
);
10793

108-
return { waitpoint, affectedTaskRuns };
109-
},
110-
(error) => {
111-
this.$.logger.error(`completeWaitpoint: Error completing waitpoint ${id}, retrying`, {
112-
error,
94+
if (waitpointError) {
95+
if (
96+
waitpointError instanceof Prisma.PrismaClientKnownRequestError &&
97+
waitpointError.code === "P2025"
98+
) {
99+
waitpoint = await this.$.prisma.waitpoint.findFirst({
100+
where: { id },
113101
});
114-
throw error;
102+
} else {
103+
this.$.logger.log("completeWaitpoint: error updating waitpoint:", { waitpointError });
104+
throw waitpointError;
115105
}
116-
);
117-
118-
if (!result) {
119-
throw new Error(`Waitpoint couldn't be updated`);
120106
}
121107

122-
if (!result.waitpoint) {
108+
if (!waitpoint) {
123109
throw new Error(`Waitpoint ${id} not found`);
124110
}
125111

126112
//schedule trying to continue the runs
127-
for (const run of result.affectedTaskRuns) {
113+
for (const run of affectedTaskRuns) {
128114
await this.$.worker.enqueue({
129115
//this will debounce the call
130116
id: `continueRunIfUnblocked:${run.taskRunId}`,
@@ -148,7 +134,7 @@ export class WaitpointSystem {
148134
}
149135
}
150136

151-
return result.waitpoint;
137+
return waitpoint;
152138
}
153139

154140
/**

packages/core/src/v3/apiClient/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,7 @@ export class ApiClient {
692692
...claims,
693693
scopes: [`write:waitpoints:${data.id}`],
694694
},
695-
expirationTime: "1h",
695+
expirationTime: "24h",
696696
});
697697

698698
return {

packages/trigger-sdk/src/v3/auth.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,14 +53,16 @@ type PublicTokenPermissionProperties = {
5353
* Grant access to specific batch runs
5454
*/
5555
batch?: string | string[];
56+
57+
/**
58+
* Grant access to specific waitpoints
59+
*/
60+
waitpoints?: string | string[];
5661
};
5762

5863
export type PublicTokenPermissions = {
5964
read?: PublicTokenPermissionProperties;
6065

61-
/**
62-
* @deprecated use trigger instead
63-
*/
6466
write?: PublicTokenPermissionProperties;
6567

6668
/**

packages/trigger-sdk/src/v3/wait.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ import { tracer } from "./tracer.js";
2424
import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization";
2525
import { SpanStatusCode } from "@opentelemetry/api";
2626

27+
export type CreateWaitpointTokenResponse = Prettify<
28+
CreateWaitpointTokenResponseBody & {
29+
publicAccessToken: string;
30+
}
31+
>;
32+
2733
/**
2834
* This creates a waitpoint token.
2935
* You can use this to pause a run until you complete the waitpoint (or it times out).
@@ -48,10 +54,6 @@ import { SpanStatusCode } from "@opentelemetry/api";
4854
* @param requestOptions - The request options for the waitpoint token.
4955
* @returns The waitpoint token.
5056
*/
51-
export type CreateWaitpointTokenResponse = CreateWaitpointTokenResponseBody & {
52-
publicAccessToken: string;
53-
};
54-
5557
function createToken(
5658
options?: CreateWaitpointTokenRequestBody,
5759
requestOptions?: ApiRequestOptions

references/hello-world/src/trigger/waits.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { logger, wait, task, retry, idempotencyKeys } from "@trigger.dev/sdk/v3";
1+
import { logger, wait, task, retry, idempotencyKeys, auth } from "@trigger.dev/sdk/v3";
22

33
type Token = {
44
status: "approved" | "pending" | "rejected";
@@ -8,13 +8,15 @@ export const waitToken = task({
88
id: "wait-token",
99
run: async ({
1010
completeBeforeWaiting = false,
11+
completeWithPublicToken = false,
1112
idempotencyKey,
1213
idempotencyKeyTTL,
1314
completionDelay,
1415
timeout,
1516
tags,
1617
}: {
1718
completeBeforeWaiting?: boolean;
19+
completeWithPublicToken?: boolean;
1820
idempotencyKey?: string;
1921
idempotencyKeyTTL?: string;
2022
completionDelay?: number;
@@ -39,6 +41,28 @@ export const waitToken = task({
3941
});
4042
logger.log("Token2", token2);
4143

44+
const publicAccessToken = await auth.createPublicToken({
45+
scopes: {
46+
write: {
47+
waitpoints: token.id,
48+
},
49+
},
50+
expirationTime: "1h",
51+
});
52+
53+
logger.log("Public access token", { publicAccessToken });
54+
55+
if (completeWithPublicToken) {
56+
await auth.withAuth(
57+
{
58+
accessToken: token.publicAccessToken,
59+
},
60+
async () => {
61+
await wait.completeToken<Token>(token.id, { status: "approved" });
62+
}
63+
);
64+
}
65+
4266
if (completeBeforeWaiting) {
4367
await wait.completeToken<Token>(token.id, { status: "approved" });
4468
await wait.for({ seconds: 5 });

0 commit comments

Comments
 (0)