Skip to content

Commit a0fb890

Browse files
committed
v3: cancel subtasks when parent task runs are cancelled
1 parent d2cb36c commit a0fb890

File tree

16 files changed

+365
-89
lines changed

16 files changed

+365
-89
lines changed

apps/webapp/app/components/runs/v3/CancelRunDialog.tsx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialog
2323
<DialogContent key="cancel">
2424
<DialogHeader>Cancel this run?</DialogHeader>
2525
<DialogDescription>
26-
Canceling a run will stop execution. If you want to run this later you will have to replay
27-
the entire run with the original payload.
26+
Canceling a run will stop execution, along with any executing subtasks.
2827
</DialogDescription>
2928
<DialogFooter>
3029
<Form action={`/resources/taskruns/${runFriendlyId}/cancel`} method="post">

apps/webapp/app/entry.server.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ export { apiRateLimiter } from "./services/apiRateLimit.server";
185185
export { socketIo } from "./v3/handleSocketIo.server";
186186
export { wss } from "./v3/handleWebsockets.server";
187187
export { registryProxy } from "./v3/registryProxy.server";
188+
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
188189
import { eventLoopMonitor } from "./eventLoopMonitor.server";
189190
import { env } from "./env.server";
190191

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 76 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import {
1616
parseCronItems,
1717
Logger as GraphileLogger,
1818
} from "graphile-worker";
19-
import { SpanKind, trace } from "@opentelemetry/api";
19+
import { SpanKind, SpanStatusCode, context, propagation, trace } from "@opentelemetry/api";
2020

2121
import omit from "lodash.omit";
2222
import { z } from "zod";
@@ -25,13 +25,20 @@ import { PgListenService } from "~/services/db/pgListen.server";
2525
import { workerLogger as logger } from "~/services/logger.server";
2626
import { flattenAttributes } from "@trigger.dev/core/v3";
2727
import { env } from "~/env.server";
28+
import { getHttpContext } from "~/services/httpAsyncStorage.server";
2829

2930
const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");
3031

3132
export interface MessageCatalogSchema {
3233
[key: string]: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any>;
3334
}
3435

36+
const ZodWorkerMessageSchema = z.object({
37+
version: z.literal("1"),
38+
payload: z.unknown(),
39+
context: z.record(z.string().optional()).optional(),
40+
});
41+
3542
const RawCronPayloadSchema = z.object({
3643
_cron: z.object({
3744
ts: z.coerce.date(),
@@ -338,11 +345,45 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
338345
}
339346
}
340347

341-
const { job, durationInMs } = await this.#addJob(
342-
identifier as string,
343-
payload,
344-
spec,
345-
options?.tx ?? this.#prisma
348+
const { job, durationInMs } = await tracer.startActiveSpan(
349+
`Enqueue ${identifier as string}`,
350+
{
351+
kind: SpanKind.PRODUCER,
352+
attributes: {
353+
"job.task_identifier": identifier as string,
354+
"job.payload": payload,
355+
"job.priority": spec.priority,
356+
"job.run_at": spec.runAt?.toISOString(),
357+
"job.jobKey": spec.jobKey,
358+
"job.flags": spec.flags,
359+
"job.max_attempts": spec.maxAttempts,
360+
"worker.name": this.#name,
361+
},
362+
},
363+
async (span) => {
364+
try {
365+
const results = await this.#addJob(
366+
identifier as string,
367+
payload,
368+
spec,
369+
options?.tx ?? this.#prisma
370+
);
371+
372+
return results;
373+
} catch (error) {
374+
if (error instanceof Error) {
375+
span.recordException(error);
376+
} else {
377+
span.recordException(new Error(String(error)));
378+
}
379+
380+
span.setStatus({ code: SpanStatusCode.ERROR });
381+
382+
throw error;
383+
} finally {
384+
span.end();
385+
}
386+
}
346387
);
347388

348389
logger.debug("Enqueued worker task", {
@@ -375,6 +416,18 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
375416
) {
376417
const now = performance.now();
377418

419+
let $payload = payload;
420+
421+
if (!getHttpContext()) {
422+
const $context = {};
423+
propagation.inject(context.active(), $context);
424+
$payload = {
425+
version: "1",
426+
payload,
427+
context: $context,
428+
};
429+
}
430+
378431
const results = await tx.$queryRawUnsafe(
379432
`SELECT * FROM ${this.graphileWorkerSchema}.add_job(
380433
identifier => $1::text,
@@ -387,7 +440,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
387440
job_key_mode => $8::text
388441
)`,
389442
identifier,
390-
JSON.stringify(payload),
443+
JSON.stringify($payload),
391444
spec.runAt || null,
392445
spec.maxAttempts || null,
393446
spec.jobKey || null,
@@ -401,6 +454,12 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
401454
const rows = AddJobResultsSchema.safeParse(results);
402455

403456
if (!rows.success) {
457+
logger.debug("results returned from add_job could not be parsed", {
458+
identifier,
459+
$payload,
460+
spec,
461+
});
462+
404463
throw new Error(
405464
`Failed to add job to queue, zod parsing error: ${JSON.stringify(rows.error)}`
406465
);
@@ -543,7 +602,11 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
543602
throw new Error(`Unknown message type: ${String(typeName)}`);
544603
}
545604

546-
const payload = messageSchema.parse(rawPayload);
605+
const messagePayload = ZodWorkerMessageSchema.safeParse(rawPayload);
606+
607+
const payload = messageSchema.parse(
608+
messagePayload.success ? messagePayload.data.payload : rawPayload
609+
);
547610
const job = helpers.job;
548611

549612
logger.debug("Received worker task, calling handler", {
@@ -558,6 +621,10 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
558621
throw new Error(`No task for message type: ${String(typeName)}`);
559622
}
560623

624+
const activeContext = messagePayload.success
625+
? propagation.extract(context.active(), messagePayload.data.context ?? {})
626+
: undefined;
627+
561628
await tracer.startActiveSpan(
562629
`Run ${typeName as string}`,
563630
{
@@ -581,6 +648,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
581648
"worker.name": this.#name,
582649
},
583650
},
651+
activeContext ?? context.active(),
584652
async (span) => {
585653
try {
586654
await task.handler(payload, job, helpers);

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { env } from "~/env.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
77
import { logger } from "~/services/logger.server";
88
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
9+
import { ServiceValidationError } from "~/v3/services/baseService.server";
910
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
1011
import { startActiveSpan } from "~/v3/tracer.server";
1112

@@ -92,18 +93,12 @@ export async function action({ request, params }: ActionFunctionArgs) {
9293
traceContext,
9394
});
9495

95-
const run = await service.call(
96-
taskId,
97-
authenticationResult.environment,
98-
{ ...body.data },
99-
// { ...body.data, payload: (anyBody as any).payload },
100-
{
101-
idempotencyKey: idempotencyKey ?? undefined,
102-
triggerVersion: triggerVersion ?? undefined,
103-
traceContext,
104-
spanParentAsLink: spanParentAsLink === 1,
105-
}
106-
);
96+
const run = await service.call(taskId, authenticationResult.environment, body.data, {
97+
idempotencyKey: idempotencyKey ?? undefined,
98+
triggerVersion: triggerVersion ?? undefined,
99+
traceContext,
100+
spanParentAsLink: spanParentAsLink === 1,
101+
});
107102

108103
if (!run) {
109104
return json({ error: "Task not found" }, { status: 404 });
@@ -113,7 +108,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
113108
id: run.friendlyId,
114109
});
115110
} catch (error) {
116-
if (error instanceof Error) {
111+
if (error instanceof ServiceValidationError) {
112+
return json({ error: error.message }, { status: 422 });
113+
} else if (error instanceof Error) {
117114
return json({ error: error.message }, { status: 400 });
118115
}
119116

apps/webapp/app/routes/metrics.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import { LoaderFunctionArgs } from "@remix-run/server-runtime";
22
import { metricsRegister } from "~/metrics.server";
3+
import { logger } from "~/services/logger.server";
34

45
export async function loader({ request }: LoaderFunctionArgs) {
6+
logger.debug("Getting metrics from the metrics register");
7+
58
return new Response(await metricsRegister.metrics(), {
69
headers: {
710
"Content-Type": metricsRegister.contentType,

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ function RunActionButtons({ span }: { span: Span }) {
261261

262262
if (span.isPartial) {
263263
return (
264-
<Dialog>
264+
<Dialog key="in-progress">
265265
<LinkButton
266266
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
267267
LeadingIcon={CloudArrowDownIcon}
@@ -290,7 +290,7 @@ function RunActionButtons({ span }: { span: Span }) {
290290
}
291291

292292
return (
293-
<Dialog>
293+
<Dialog key="complete">
294294
<LinkButton
295295
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
296296
LeadingIcon={CloudArrowDownIcon}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { AsyncLocalStorage } from "node:async_hooks";
2+
3+
export type HttpLocalStorage = {
4+
requestId: string;
5+
path: string;
6+
host: string;
7+
};
8+
9+
const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
10+
11+
export type RunWithHttpContextFunction = <T>(context: HttpLocalStorage, fn: () => T) => T;
12+
13+
export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T {
14+
return httpLocalStorage.run(context, fn);
15+
}
16+
17+
export function getHttpContext(): HttpLocalStorage | undefined {
18+
return httpLocalStorage.getStore();
19+
}

apps/webapp/app/services/logger.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { LogLevel } from "@trigger.dev/core-backend";
22
import { Logger } from "@trigger.dev/core-backend";
33
import { sensitiveDataReplacer } from "./sensitiveDataReplacer";
44
import { AsyncLocalStorage } from "async_hooks";
5+
import { getHttpContext } from "./httpAsyncStorage.server";
56

67
const currentFieldsStore = new AsyncLocalStorage<Record<string, unknown>>();
78

@@ -16,7 +17,8 @@ export const logger = new Logger(
1617
sensitiveDataReplacer,
1718
() => {
1819
const fields = currentFieldsStore.getStore();
19-
return fields ? { ...fields } : {};
20+
const httpContext = getHttpContext();
21+
return { ...fields, http: httpContext };
2022
}
2123
);
2224

apps/webapp/app/v3/services/baseService.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ export abstract class BaseService {
1818
try {
1919
return await fn(span);
2020
} catch (e) {
21+
if (e instanceof ServiceValidationError) {
22+
throw e;
23+
}
24+
2125
if (e instanceof Error) {
2226
span.recordException(e);
2327
} else {

0 commit comments

Comments
 (0)