Skip to content

Commit 4f95c9d

Browse files
authored
v3: Cancel awaited subtasks and reliable rate-limit recovery (#1200)
* v3: cancel subtasks when parent task runs are cancelled * v3: recover from server rate limiting errors in a more reliable way - Changing from sliding window to token bucket in the API rate limiter, to help smooth out traffic - Adding spans to the API Client core & SDK functions - Added waiting spans when retrying in the API Client - Retrying in the API Client now respects the x-ratelimit-reset - Retrying ApiError’s in tasks now respects the x-ratelimit-reset - Added AbortTaskRunError that when thrown will stop retries - Added idempotency keys SDK functions and automatically injecting the run ID when inside a task - Added the ability to configure ApiRequestOptions (retries only for now) globally and on specific calls - Implement the maxAttempts TaskRunOption (it wasn’t doing anything before) * Adding some docs about the request options * Fix type error * Remove context propagation through graphile jobs * Remove logger * only select a subset of task run columns * limit columns selected in batchTrigger as well * added idempotency doc * allow scoped idempotency keys, and fixed an issue with the unique index on BatchTaskRun and TaskRun * Removed old cancel task run children code
1 parent b53a575 commit 4f95c9d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+2600
-631
lines changed

.changeset/lemon-sloths-hide.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
v3: recover from server rate limiting errors in a more reliable way

apps/webapp/app/components/runs/v3/CancelRunDialog.tsx

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialog
2323
<DialogContent key="cancel">
2424
<DialogHeader>Cancel this run?</DialogHeader>
2525
<DialogDescription>
26-
Canceling a run will stop execution. If you want to run this later you will have to replay
27-
the entire run with the original payload.
26+
Canceling a run will stop execution, along with any executing subtasks.
2827
</DialogDescription>
2928
<DialogFooter>
3029
<Form action={`/resources/taskruns/${runFriendlyId}/cancel`} method="post">

apps/webapp/app/entry.server.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import {
1616
} from "./components/primitives/OperatingSystemProvider";
1717
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer";
1818
import { singleton } from "./utils/singleton";
19-
import { logger } from "./services/logger.server";
2019

2120
const ABORT_DELAY = 30000;
2221

@@ -186,6 +185,7 @@ export { apiRateLimiter } from "./services/apiRateLimit.server";
186185
export { socketIo } from "./v3/handleSocketIo.server";
187186
export { wss } from "./v3/handleWebsockets.server";
188187
export { registryProxy } from "./v3/registryProxy.server";
188+
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
189189
import { eventLoopMonitor } from "./eventLoopMonitor.server";
190190
import { env } from "./env.server";
191191

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ const EnvironmentSchema = z.object({
9797
* @example "1000ms"
9898
* @example "1000s"
9999
*/
100-
API_RATE_LIMIT_WINDOW: z.string().default("60s"),
101-
API_RATE_LIMIT_MAX: z.coerce.number().int().default(600),
100+
API_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"), // refill 250 tokens every 10 seconds
101+
API_RATE_LIMIT_MAX: z.coerce.number().int().default(750), // allow bursts of 750 requests
102+
API_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(250), // refix 250 tokens every 10 seconds
102103
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
103104
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),
104105

apps/webapp/app/platform/zodWorker.server.ts

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
12
import type {
23
CronItem,
34
CronItemOptions,
@@ -11,20 +12,19 @@ import type {
1112
WorkerUtils,
1213
} from "graphile-worker";
1314
import {
15+
Logger as GraphileLogger,
1416
run as graphileRun,
1517
makeWorkerUtils,
1618
parseCronItems,
17-
Logger as GraphileLogger,
1819
} from "graphile-worker";
19-
import { SpanKind, trace } from "@opentelemetry/api";
2020

21+
import { flattenAttributes } from "@trigger.dev/core/v3";
2122
import omit from "lodash.omit";
2223
import { z } from "zod";
2324
import { $replica, PrismaClient, PrismaClientOrTransaction } from "~/db.server";
25+
import { env } from "~/env.server";
2426
import { PgListenService } from "~/services/db/pgListen.server";
2527
import { workerLogger as logger } from "~/services/logger.server";
26-
import { flattenAttributes } from "@trigger.dev/core/v3";
27-
import { env } from "~/env.server";
2828

2929
const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");
3030

@@ -338,11 +338,45 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
338338
}
339339
}
340340

341-
const { job, durationInMs } = await this.#addJob(
342-
identifier as string,
343-
payload,
344-
spec,
345-
options?.tx ?? this.#prisma
341+
const { job, durationInMs } = await tracer.startActiveSpan(
342+
`Enqueue ${identifier as string}`,
343+
{
344+
kind: SpanKind.PRODUCER,
345+
attributes: {
346+
"job.task_identifier": identifier as string,
347+
"job.payload": payload,
348+
"job.priority": spec.priority,
349+
"job.run_at": spec.runAt?.toISOString(),
350+
"job.jobKey": spec.jobKey,
351+
"job.flags": spec.flags,
352+
"job.max_attempts": spec.maxAttempts,
353+
"worker.name": this.#name,
354+
},
355+
},
356+
async (span) => {
357+
try {
358+
const results = await this.#addJob(
359+
identifier as string,
360+
payload,
361+
spec,
362+
options?.tx ?? this.#prisma
363+
);
364+
365+
return results;
366+
} catch (error) {
367+
if (error instanceof Error) {
368+
span.recordException(error);
369+
} else {
370+
span.recordException(new Error(String(error)));
371+
}
372+
373+
span.setStatus({ code: SpanStatusCode.ERROR });
374+
375+
throw error;
376+
} finally {
377+
span.end();
378+
}
379+
}
346380
);
347381

348382
logger.debug("Enqueued worker task", {
@@ -401,6 +435,12 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
401435
const rows = AddJobResultsSchema.safeParse(results);
402436

403437
if (!rows.success) {
438+
logger.debug("results returned from add_job could not be parsed", {
439+
identifier,
440+
payload,
441+
spec,
442+
});
443+
404444
throw new Error(
405445
`Failed to add job to queue, zod parsing error: ${JSON.stringify(rows.error)}`
406446
);
@@ -422,9 +462,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
422462
const job = AddJobResultsSchema.safeParse(result);
423463

424464
if (!job.success) {
425-
logger.debug("results returned from remove_job could not be parsed", {
426-
error: job.error.flatten(),
427-
result,
465+
logger.debug("could not remove job, job_key did not exist", {
428466
jobKey,
429467
});
430468

apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { env } from "~/env.server";
66
import { authenticateApiRequest } from "~/services/apiAuth.server";
77
import { logger } from "~/services/logger.server";
88
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
9+
import { ServiceValidationError } from "~/v3/services/baseService.server";
910
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
1011
import { startActiveSpan } from "~/v3/tracer.server";
1112

@@ -92,18 +93,12 @@ export async function action({ request, params }: ActionFunctionArgs) {
9293
traceContext,
9394
});
9495

95-
const run = await service.call(
96-
taskId,
97-
authenticationResult.environment,
98-
{ ...body.data },
99-
// { ...body.data, payload: (anyBody as any).payload },
100-
{
101-
idempotencyKey: idempotencyKey ?? undefined,
102-
triggerVersion: triggerVersion ?? undefined,
103-
traceContext,
104-
spanParentAsLink: spanParentAsLink === 1,
105-
}
106-
);
96+
const run = await service.call(taskId, authenticationResult.environment, body.data, {
97+
idempotencyKey: idempotencyKey ?? undefined,
98+
triggerVersion: triggerVersion ?? undefined,
99+
traceContext,
100+
spanParentAsLink: spanParentAsLink === 1,
101+
});
107102

108103
if (!run) {
109104
return json({ error: "Task not found" }, { status: 404 });
@@ -113,7 +108,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
113108
id: run.friendlyId,
114109
});
115110
} catch (error) {
116-
if (error instanceof Error) {
111+
if (error instanceof ServiceValidationError) {
112+
return json({ error: error.message }, { status: 422 });
113+
} else if (error instanceof Error) {
117114
return json({ error: error.message }, { status: 400 });
118115
}
119116

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.v3.$projectParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ function RunActionButtons({ span }: { span: Span }) {
261261

262262
if (span.isPartial) {
263263
return (
264-
<Dialog>
264+
<Dialog key="in-progress">
265265
<LinkButton
266266
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
267267
LeadingIcon={CloudArrowDownIcon}
@@ -290,7 +290,7 @@ function RunActionButtons({ span }: { span: Span }) {
290290
}
291291

292292
return (
293-
<Dialog>
293+
<Dialog key="complete">
294294
<LinkButton
295295
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
296296
LeadingIcon={CloudArrowDownIcon}

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,10 @@ export function authorizationRateLimitMiddleware({
106106
hashedAuthorizationValue
107107
);
108108

109+
const $remaining = Math.max(0, remaining); // remaining can be negative if the user has exceeded the limit, so clamp it to 0
110+
109111
res.set("x-ratelimit-limit", limit.toString());
110-
res.set("x-ratelimit-remaining", remaining.toString());
112+
res.set("x-ratelimit-remaining", $remaining.toString());
111113
res.set("x-ratelimit-reset", reset.toString());
112114

113115
if (success) {
@@ -122,12 +124,12 @@ export function authorizationRateLimitMiddleware({
122124
title: "Rate Limit Exceeded",
123125
status: 429,
124126
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429",
125-
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
127+
detail: `Rate limit exceeded ${$remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
126128
reset,
127129
limit,
128130
remaining,
129131
secondsUntilReset,
130-
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
132+
error: `Rate limit exceeded ${$remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
131133
},
132134
null,
133135
2
@@ -138,7 +140,11 @@ export function authorizationRateLimitMiddleware({
138140

139141
export const apiRateLimiter = authorizationRateLimitMiddleware({
140142
keyPrefix: "api",
141-
limiter: Ratelimit.slidingWindow(env.API_RATE_LIMIT_MAX, env.API_RATE_LIMIT_WINDOW as Duration),
143+
limiter: Ratelimit.tokenBucket(
144+
env.API_RATE_LIMIT_REFILL_RATE,
145+
env.API_RATE_LIMIT_REFILL_INTERVAL as Duration,
146+
env.API_RATE_LIMIT_MAX
147+
),
142148
pathMatchers: [/^\/api/],
143149
// Allow /api/v1/tasks/:id/callback/:secret
144150
pathWhiteList: [
@@ -152,6 +158,8 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
152158
/^\/api\/v1\/sources\/http\/[^\/]+$/, // /api/v1/sources/http/$id
153159
/^\/api\/v1\/endpoints\/[^\/]+\/[^\/]+\/index\/[^\/]+$/, // /api/v1/endpoints/$environmentId/$endpointSlug/index/$indexHookIdentifier
154160
"/api/v1/timezones",
161+
"/api/v1/usage/ingest",
162+
/^\/api\/v1\/runs\/[^\/]+\/attempts$/, // /api/v1/runs/$runFriendlyId/attempts
155163
],
156164
log: {
157165
rejections: env.API_RATE_LIMIT_REJECTION_LOGS_ENABLED === "1",
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import { AsyncLocalStorage } from "node:async_hooks";
2+
3+
export type HttpLocalStorage = {
4+
requestId: string;
5+
path: string;
6+
host: string;
7+
};
8+
9+
const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();
10+
11+
export type RunWithHttpContextFunction = <T>(context: HttpLocalStorage, fn: () => T) => T;
12+
13+
export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T {
14+
return httpLocalStorage.run(context, fn);
15+
}
16+
17+
export function getHttpContext(): HttpLocalStorage | undefined {
18+
return httpLocalStorage.getStore();
19+
}

apps/webapp/app/services/logger.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import type { LogLevel } from "@trigger.dev/core-backend";
22
import { Logger } from "@trigger.dev/core-backend";
33
import { sensitiveDataReplacer } from "./sensitiveDataReplacer";
44
import { AsyncLocalStorage } from "async_hooks";
5+
import { getHttpContext } from "./httpAsyncStorage.server";
56

67
const currentFieldsStore = new AsyncLocalStorage<Record<string, unknown>>();
78

@@ -16,7 +17,8 @@ export const logger = new Logger(
1617
sensitiveDataReplacer,
1718
() => {
1819
const fields = currentFieldsStore.getStore();
19-
return fields ? { ...fields } : {};
20+
const httpContext = getHttpContext();
21+
return { ...fields, http: httpContext };
2022
}
2123
);
2224

apps/webapp/app/services/worker.server.ts

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ import { prisma } from "~/db.server";
55
import { env } from "~/env.server";
66
import { ZodWorker } from "~/platform/zodWorker.server";
77
import { eventRepository } from "~/v3/eventRepository.server";
8+
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
9+
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
10+
import { reportUsageEvent } from "~/v3/openMeter.server";
811
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
912
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
1013
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
1114
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
1215
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
16+
import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server";
17+
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
1318
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
19+
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";
1420
import { IndexDeploymentService } from "~/v3/services/indexDeployment.server";
1521
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
1622
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
@@ -44,11 +50,6 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
4450
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
4551
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
4652
import { ResumeTaskService } from "./tasks/resumeTask.server";
47-
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
48-
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
49-
import { reportUsageEvent } from "~/v3/openMeter.server";
50-
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
51-
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";
5253

5354
const workerCatalog = {
5455
indexEndpoint: z.object({
@@ -185,6 +186,9 @@ const workerCatalog = {
185186
"v3.expireRun": z.object({
186187
runId: z.string(),
187188
}),
189+
"v3.cancelTaskAttemptDependencies": z.object({
190+
attemptId: z.string(),
191+
}),
188192
};
189193

190194
const executionWorkerCatalog = {
@@ -698,6 +702,15 @@ function getWorkerQueue() {
698702
return await service.call(payload.runId);
699703
},
700704
},
705+
"v3.cancelTaskAttemptDependencies": {
706+
priority: 0,
707+
maxAttempts: 8,
708+
handler: async (payload, job) => {
709+
const service = new CancelTaskAttemptDependenciesService();
710+
711+
return await service.call(payload.attemptId);
712+
},
713+
},
701714
},
702715
});
703716
}

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1411,7 +1411,9 @@ function filteredAttributes(attributes: Attributes, prefix: string): Attributes
14111411
}
14121412

14131413
function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) {
1414-
return Number(BigInt(endTime.getTime() * 1_000_000) - startTime);
1414+
const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime;
1415+
1416+
return Number(BigInt($endtime.getTime() * 1_000_000) - startTime);
14151417
}
14161418

14171419
function getNowInNanoseconds(): bigint {

apps/webapp/app/v3/services/baseService.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ export abstract class BaseService {
1818
try {
1919
return await fn(span);
2020
} catch (e) {
21+
if (e instanceof ServiceValidationError) {
22+
throw e;
23+
}
24+
2125
if (e instanceof Error) {
2226
span.recordException(e);
2327
} else {

0 commit comments

Comments
 (0)