Skip to content

Commit 3543bc6

Browse files
committed
Added useBatch hook
1 parent bd2b9a0 commit 3543bc6

File tree

23 files changed

+616
-4833
lines changed

23 files changed

+616
-4833
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,20 @@ export async function action({ request, params }: ActionFunctionArgs) {
104104
return json({ error: "Task not found" }, { status: 404 });
105105
}
106106

107-
return json({
108-
batchId: result.batch.friendlyId,
109-
runs: result.runs,
110-
});
107+
return json(
108+
{
109+
batchId: result.batch.friendlyId,
110+
runs: result.runs,
111+
},
112+
{
113+
headers: {
114+
"x-trigger-jwt-claims": JSON.stringify({
115+
sub: authenticationResult.environment.id,
116+
pub: true,
117+
}),
118+
},
119+
}
120+
);
111121
} catch (error) {
112122
if (error instanceof Error) {
113123
return json({ error: error.message }, { status: 400 });

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,9 +107,19 @@ export async function action({ request, params }: ActionFunctionArgs) {
107107
return json({ error: "Task not found" }, { status: 404 });
108108
}
109109

110-
return json({
111-
id: run.friendlyId,
112-
});
110+
return json(
111+
{
112+
id: run.friendlyId,
113+
},
114+
{
115+
headers: {
116+
"x-trigger-jwt-claims": JSON.stringify({
117+
sub: authenticationResult.environment.id,
118+
pub: true,
119+
}),
120+
},
121+
}
122+
);
113123
} catch (error) {
114124
if (error instanceof ServiceValidationError) {
115125
return json({ error: error.message }, { status: 422 });
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
2+
import { z } from "zod";
3+
import { $replica } from "~/db.server";
4+
import { env } from "~/env.server";
5+
import { permittedToReadBatch } from "~/services/accessControl.server";
6+
import { authenticateApiRequest } from "~/services/apiAuth.server";
7+
import { logger } from "~/services/logger.server";
8+
import { makeApiCors } from "~/utils/apiCors";
9+
import { longPollingFetch } from "~/utils/longPollingFetch";
10+
11+
const ParamsSchema = z.object({
12+
batchId: z.string(),
13+
});
14+
15+
export async function loader({ request, params }: ActionFunctionArgs) {
16+
const apiCors = makeApiCors(request);
17+
18+
if (request.method.toUpperCase() === "OPTIONS") {
19+
return apiCors(json({}));
20+
}
21+
22+
// Authenticate the request
23+
const authenticationResult = await authenticateApiRequest(request, { allowJWT: true });
24+
25+
if (!authenticationResult) {
26+
return apiCors(json({ error: "Invalid or Missing API Key" }, { status: 401 }));
27+
}
28+
29+
const parsedParams = ParamsSchema.safeParse(params);
30+
31+
if (!parsedParams.success) {
32+
return apiCors(
33+
json(
34+
{ error: "Invalid request parameters", issues: parsedParams.error.issues },
35+
{ status: 400 }
36+
)
37+
);
38+
}
39+
40+
if (!permittedToReadBatch(authenticationResult, parsedParams.data.batchId)) {
41+
return apiCors(json({ error: "Unauthorized" }, { status: 403 }));
42+
}
43+
44+
try {
45+
const batchRun = await $replica.batchTaskRun.findFirst({
46+
where: {
47+
friendlyId: parsedParams.data.batchId,
48+
runtimeEnvironmentId: authenticationResult.environment.id,
49+
},
50+
});
51+
52+
if (!batchRun) {
53+
return apiCors(json({ error: "Batch Run not found" }, { status: 404 }));
54+
}
55+
56+
const url = new URL(request.url);
57+
const originUrl = new URL(`${env.ELECTRIC_ORIGIN}/v1/shape/public."TaskRun"`);
58+
url.searchParams.forEach((value, key) => {
59+
originUrl.searchParams.set(key, value);
60+
});
61+
62+
originUrl.searchParams.set("where", `"batchId"='${batchRun.id}'`);
63+
64+
const finalUrl = originUrl.toString();
65+
66+
return longPollingFetch(finalUrl);
67+
} catch (error) {
68+
if (error instanceof Response) {
69+
// Error responses from longPollingFetch
70+
return apiCors(error);
71+
} else if (error instanceof TypeError) {
72+
// Unexpected errors
73+
logger.error("Unexpected error in loader:", { error: error.message });
74+
return apiCors(new Response("An unexpected error occurred", { status: 500 }));
75+
} else {
76+
// Unknown errors
77+
logger.error("Unknown error occurred in loader, not Error", { error: JSON.stringify(error) });
78+
return apiCors(new Response("An unknown error occurred", { status: 500 }));
79+
}
80+
}
81+
}

apps/webapp/app/services/accessControl.server.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,40 @@ export function permittedToReadRun(
4141

4242
return false;
4343
}
44+
45+
export function permittedToReadBatch(
46+
authenticationResult: ApiAuthenticationResult,
47+
batchId: string
48+
): boolean {
49+
if (authenticationResult.type === "PRIVATE") {
50+
return true;
51+
}
52+
53+
if (authenticationResult.type === "PUBLIC") {
54+
return true;
55+
}
56+
57+
if (!authenticationResult.claims) {
58+
return false;
59+
}
60+
61+
const parsedClaims = ClaimsSchema.safeParse(authenticationResult.claims);
62+
63+
if (!parsedClaims.success) {
64+
return false;
65+
}
66+
67+
if (parsedClaims.data.permissions?.includes("read:runs")) {
68+
return true;
69+
}
70+
71+
if (parsedClaims.data.permissions?.includes(`read:batches:${batchId}`)) {
72+
return true;
73+
}
74+
75+
if (parsedClaims.data.permissions?.includes(batchId)) {
76+
return true;
77+
}
78+
79+
return false;
80+
}

packages/core/src/v3/apiClient/index.ts

Lines changed: 55 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
ListRunsQueryParams,
5454
UpdateEnvironmentVariableParams,
5555
} from "./types.js";
56+
import { generateJWT } from "../jwt.js";
5657

5758
export type {
5859
CreateEnvironmentVariableParams,
@@ -167,7 +168,26 @@ export class ApiClient {
167168
body: JSON.stringify(body),
168169
},
169170
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
170-
);
171+
)
172+
.withResponse()
173+
.then(async ({ response, data }) => {
174+
const claimsHeader = response.headers.get("x-trigger-jwt-claims");
175+
const claims = claimsHeader ? JSON.parse(claimsHeader) : undefined;
176+
177+
const jwt = await generateJWT({
178+
secretKey: this.accessToken,
179+
payload: {
180+
...claims,
181+
permissions: [data.id],
182+
},
183+
expirationTime: "1h",
184+
});
185+
186+
return {
187+
...data,
188+
jwt,
189+
};
190+
});
171191
}
172192

173193
batchTriggerTask(
@@ -187,7 +207,26 @@ export class ApiClient {
187207
body: JSON.stringify(body),
188208
},
189209
mergeRequestOptions(this.defaultRequestOptions, requestOptions)
190-
);
210+
)
211+
.withResponse()
212+
.then(async ({ response, data }) => {
213+
const claimsHeader = response.headers.get("x-trigger-jwt-claims");
214+
const claims = claimsHeader ? JSON.parse(claimsHeader) : undefined;
215+
216+
const jwt = await generateJWT({
217+
secretKey: this.accessToken,
218+
payload: {
219+
...claims,
220+
permissions: [data.batchId],
221+
},
222+
expirationTime: "1h",
223+
});
224+
225+
return {
226+
...data,
227+
jwt,
228+
};
229+
});
191230
}
192231

193232
createUploadPayloadUrl(filename: string, requestOptions?: ZodFetchOptions) {
@@ -545,7 +584,20 @@ export class ApiClient {
545584
subscribeToRunChanges<TPayload = any, TOutput = any>(runId: string) {
546585
return runShapeStream<TPayload, TOutput>(
547586
`${this.baseUrl}/realtime/v1/runs/${runId}`,
548-
this.fetchClient
587+
this.fetchClient,
588+
{
589+
closeOnComplete: true,
590+
}
591+
);
592+
}
593+
594+
subscribeToBatchChanges<TPayload = any, TOutput = any>(batchId: string) {
595+
return runShapeStream<TPayload, TOutput>(
596+
`${this.baseUrl}/realtime/v1/batches/${batchId}`,
597+
this.fetchClient,
598+
{
599+
closeOnComplete: false,
600+
}
549601
);
550602
}
551603

packages/core/src/v3/apiClient/runStream.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@ export type RunStreamCallback<TPayload = any, TOutput = any> = (
4242
run: RunShape<TPayload, TOutput>
4343
) => void | Promise<void>;
4444

45+
export type RunShapeStreamOptions = {
46+
closeOnComplete?: boolean;
47+
};
48+
4549
export function runShapeStream<TPayload = any, TOutput = any>(
4650
url: string,
47-
fetchClient: typeof fetch
51+
fetchClient: typeof fetch,
52+
options?: RunShapeStreamOptions
4853
): RunSubscription<TPayload, TOutput> {
49-
const subscription = new RunSubscription<TPayload, TOutput>(url, fetchClient);
54+
const subscription = new RunSubscription<TPayload, TOutput>(url, fetchClient, options);
5055

5156
return subscription.init();
5257
}
@@ -59,7 +64,8 @@ export class RunSubscription<TPayload = any, TOutput = any> {
5964

6065
constructor(
6166
private url: string,
62-
private fetchClient: typeof fetch
67+
private fetchClient: typeof fetch,
68+
private options?: RunShapeStreamOptions
6369
) {
6470
this.abortController = new AbortController();
6571
}
@@ -72,7 +78,11 @@ export class RunSubscription<TPayload = any, TOutput = any> {
7278
this.url,
7379
async (shape) => {
7480
controller.enqueue(shape);
75-
if (shape.completedAt && !this.abortController.signal.aborted) {
81+
if (
82+
this.options?.closeOnComplete &&
83+
shape.completedAt &&
84+
!this.abortController.signal.aborted
85+
) {
7686
controller.close();
7787
this.abortController.abort();
7888
}

packages/core/src/v3/types/tasks.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ export type BrandedRun<T, P, O> = T & BrandRun<O, P>;
341341
export type RunHandle<TPayload, TOutput> = BrandedRun<
342342
{
343343
id: string;
344+
jwt: string;
344345
},
345346
TPayload,
346347
TOutput
@@ -355,6 +356,7 @@ export type BatchRunHandle<TPayload, TOutput> = BrandedRun<
355356
{
356357
batchId: string;
357358
runs: Array<RunHandle<TPayload, TOutput>>;
359+
jwt: string;
358360
},
359361
TOutput,
360362
TPayload
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
"use client";
2+
3+
import { AnyTask, TaskRunShape } from "@trigger.dev/core/v3";
4+
import { useEffect, useState } from "react";
5+
import { useApiClient } from "./useApiClient.js";
6+
7+
export function useBatch<TTask extends AnyTask>(batchId: string) {
8+
const [runShapes, setRunShapes] = useState<TaskRunShape<TTask>[]>([]);
9+
const [error, setError] = useState<Error | null>(null);
10+
const apiClient = useApiClient();
11+
12+
useEffect(() => {
13+
const subscription = apiClient.subscribeToBatchChanges(batchId);
14+
15+
async function iterateUpdates() {
16+
for await (const run of subscription) {
17+
setRunShapes((prevRuns) => {
18+
return insertRunShapeInOrder(prevRuns, run);
19+
});
20+
}
21+
}
22+
23+
iterateUpdates().catch((err) => {
24+
setError(err);
25+
});
26+
27+
return () => {
28+
subscription.unsubscribe();
29+
};
30+
}, [batchId]);
31+
32+
return { runs: runShapes, error };
33+
}
34+
35+
// Inserts and then orders by the run number, and ensures that the run is not duplicated
36+
function insertRunShapeInOrder<TTask extends AnyTask>(
37+
previousRuns: TaskRunShape<TTask>[],
38+
run: TaskRunShape<TTask>
39+
) {
40+
const existingRun = previousRuns.find((r) => r.id === run.id);
41+
if (existingRun) {
42+
return previousRuns.map((r) => (r.id === run.id ? run : r));
43+
}
44+
45+
const runNumber = run.number;
46+
const index = previousRuns.findIndex((r) => r.number > runNumber);
47+
if (index === -1) {
48+
return [...previousRuns, run];
49+
}
50+
51+
return [...previousRuns.slice(0, index), run, ...previousRuns.slice(index)];
52+
}

packages/react-hooks/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export * from "./contexts.js";
22
export * from "./hooks/useApiClient.js";
33
export * from "./hooks/useRun.js";
4+
export * from "./hooks/useBatch.js";

packages/trigger-sdk/src/v3/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ export {
3232
type LogLevel,
3333
} from "@trigger.dev/core/v3";
3434

35-
export { runs, type RunShape, type AnyRunShape } from "./runs.js";
35+
export { runs, type RunShape, type AnyRunShape, type TaskRunShape } from "./runs.js";
3636
export * as schedules from "./schedules/index.js";
3737
export * as envvars from "./envvars.js";
3838
export type { ImportEnvironmentVariablesParams } from "./envvars.js";

packages/trigger-sdk/src/v3/runs.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type {
55
RescheduleRunRequestBody,
66
RunShape,
77
AnyRunShape,
8+
TaskRunShape,
89
RunSubscription,
910
Prettify,
1011
} from "@trigger.dev/core/v3";
@@ -32,7 +33,7 @@ export type RetrieveRunResult<TPayload = any, TOutput = any> = Prettify<
3233
}
3334
>;
3435

35-
export type { RunShape, AnyRunShape };
36+
export type { RunShape, AnyRunShape, TaskRunShape };
3637

3738
export const runs = {
3839
replay: replayRun,

0 commit comments

Comments
 (0)