Skip to content

v3: Cancel awaited subtasks and reliable rate-limit recovery #1200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/lemon-sloths-hide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

v3: recover from server rate limiting errors in a more reliable way
3 changes: 1 addition & 2 deletions apps/webapp/app/components/runs/v3/CancelRunDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialog
<DialogContent key="cancel">
<DialogHeader>Cancel this run?</DialogHeader>
<DialogDescription>
Canceling a run will stop execution. If you want to run this later you will have to replay
the entire run with the original payload.
Canceling a run will stop execution, along with any executing subtasks.
</DialogDescription>
<DialogFooter>
<Form action={`/resources/taskruns/${runFriendlyId}/cancel`} method="post">
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import {
} from "./components/primitives/OperatingSystemProvider";
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer";
import { singleton } from "./utils/singleton";
import { logger } from "./services/logger.server";

const ABORT_DELAY = 30000;

Expand Down Expand Up @@ -186,6 +185,7 @@ export { apiRateLimiter } from "./services/apiRateLimit.server";
export { socketIo } from "./v3/handleSocketIo.server";
export { wss } from "./v3/handleWebsockets.server";
export { registryProxy } from "./v3/registryProxy.server";
export { runWithHttpContext } from "./services/httpAsyncStorage.server";
import { eventLoopMonitor } from "./eventLoopMonitor.server";
import { env } from "./env.server";

Expand Down
5 changes: 3 additions & 2 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ const EnvironmentSchema = z.object({
* @example "1000ms"
* @example "1000s"
*/
API_RATE_LIMIT_WINDOW: z.string().default("60s"),
API_RATE_LIMIT_MAX: z.coerce.number().int().default(600),
API_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"), // refill 250 tokens every 10 seconds
API_RATE_LIMIT_MAX: z.coerce.number().int().default(750), // allow bursts of 750 requests
API_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(250), // refix 250 tokens every 10 seconds
API_RATE_LIMIT_REQUEST_LOGS_ENABLED: z.string().default("0"),
API_RATE_LIMIT_REJECTION_LOGS_ENABLED: z.string().default("1"),

Expand Down
62 changes: 50 additions & 12 deletions apps/webapp/app/platform/zodWorker.server.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { SpanKind, SpanStatusCode, trace } from "@opentelemetry/api";
import type {
CronItem,
CronItemOptions,
Expand All @@ -11,20 +12,19 @@ import type {
WorkerUtils,
} from "graphile-worker";
import {
Logger as GraphileLogger,
run as graphileRun,
makeWorkerUtils,
parseCronItems,
Logger as GraphileLogger,
} from "graphile-worker";
import { SpanKind, trace } from "@opentelemetry/api";

import { flattenAttributes } from "@trigger.dev/core/v3";
import omit from "lodash.omit";
import { z } from "zod";
import { $replica, PrismaClient, PrismaClientOrTransaction } from "~/db.server";
import { env } from "~/env.server";
import { PgListenService } from "~/services/db/pgListen.server";
import { workerLogger as logger } from "~/services/logger.server";
import { flattenAttributes } from "@trigger.dev/core/v3";
import { env } from "~/env.server";

const tracer = trace.getTracer("zodWorker", "3.0.0.dp.1");

Expand Down Expand Up @@ -338,11 +338,45 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
}
}

const { job, durationInMs } = await this.#addJob(
identifier as string,
payload,
spec,
options?.tx ?? this.#prisma
const { job, durationInMs } = await tracer.startActiveSpan(
`Enqueue ${identifier as string}`,
{
kind: SpanKind.PRODUCER,
attributes: {
"job.task_identifier": identifier as string,
"job.payload": payload,
"job.priority": spec.priority,
"job.run_at": spec.runAt?.toISOString(),
"job.jobKey": spec.jobKey,
"job.flags": spec.flags,
"job.max_attempts": spec.maxAttempts,
"worker.name": this.#name,
},
},
async (span) => {
try {
const results = await this.#addJob(
identifier as string,
payload,
spec,
options?.tx ?? this.#prisma
);

return results;
} catch (error) {
if (error instanceof Error) {
span.recordException(error);
} else {
span.recordException(new Error(String(error)));
}

span.setStatus({ code: SpanStatusCode.ERROR });

throw error;
} finally {
span.end();
}
}
);

logger.debug("Enqueued worker task", {
Expand Down Expand Up @@ -401,6 +435,12 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
const rows = AddJobResultsSchema.safeParse(results);

if (!rows.success) {
logger.debug("results returned from add_job could not be parsed", {
identifier,
payload,
spec,
});

throw new Error(
`Failed to add job to queue, zod parsing error: ${JSON.stringify(rows.error)}`
);
Expand All @@ -422,9 +462,7 @@ export class ZodWorker<TMessageCatalog extends MessageCatalogSchema> {
const job = AddJobResultsSchema.safeParse(result);

if (!job.success) {
logger.debug("results returned from remove_job could not be parsed", {
error: job.error.flatten(),
result,
logger.debug("could not remove job, job_key did not exist", {
jobKey,
});

Expand Down
23 changes: 10 additions & 13 deletions apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { env } from "~/env.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
import { ServiceValidationError } from "~/v3/services/baseService.server";
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
import { startActiveSpan } from "~/v3/tracer.server";

Expand Down Expand Up @@ -92,18 +93,12 @@ export async function action({ request, params }: ActionFunctionArgs) {
traceContext,
});

const run = await service.call(
taskId,
authenticationResult.environment,
{ ...body.data },
// { ...body.data, payload: (anyBody as any).payload },
{
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
}
);
const run = await service.call(taskId, authenticationResult.environment, body.data, {
idempotencyKey: idempotencyKey ?? undefined,
triggerVersion: triggerVersion ?? undefined,
traceContext,
spanParentAsLink: spanParentAsLink === 1,
});

if (!run) {
return json({ error: "Task not found" }, { status: 404 });
Expand All @@ -113,7 +108,9 @@ export async function action({ request, params }: ActionFunctionArgs) {
id: run.friendlyId,
});
} catch (error) {
if (error instanceof Error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: 422 });
} else if (error instanceof Error) {
return json({ error: error.message }, { status: 400 });
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ function RunActionButtons({ span }: { span: Span }) {

if (span.isPartial) {
return (
<Dialog>
<Dialog key="in-progress">
<LinkButton
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
LeadingIcon={CloudArrowDownIcon}
Expand Down Expand Up @@ -290,7 +290,7 @@ function RunActionButtons({ span }: { span: Span }) {
}

return (
<Dialog>
<Dialog key="complete">
<LinkButton
to={v3RunDownloadLogsPath({ friendlyId: runParam })}
LeadingIcon={CloudArrowDownIcon}
Expand Down
16 changes: 12 additions & 4 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,10 @@ export function authorizationRateLimitMiddleware({
hashedAuthorizationValue
);

const $remaining = Math.max(0, remaining); // remaining can be negative if the user has exceeded the limit, so clamp it to 0

res.set("x-ratelimit-limit", limit.toString());
res.set("x-ratelimit-remaining", remaining.toString());
res.set("x-ratelimit-remaining", $remaining.toString());
res.set("x-ratelimit-reset", reset.toString());

if (success) {
Expand All @@ -122,12 +124,12 @@ export function authorizationRateLimitMiddleware({
title: "Rate Limit Exceeded",
status: 429,
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429",
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
detail: `Rate limit exceeded ${$remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
reset,
limit,
remaining,
secondsUntilReset,
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
error: `Rate limit exceeded ${$remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
},
null,
2
Expand All @@ -138,7 +140,11 @@ export function authorizationRateLimitMiddleware({

export const apiRateLimiter = authorizationRateLimitMiddleware({
keyPrefix: "api",
limiter: Ratelimit.slidingWindow(env.API_RATE_LIMIT_MAX, env.API_RATE_LIMIT_WINDOW as Duration),
limiter: Ratelimit.tokenBucket(
env.API_RATE_LIMIT_REFILL_RATE,
env.API_RATE_LIMIT_REFILL_INTERVAL as Duration,
env.API_RATE_LIMIT_MAX
),
pathMatchers: [/^\/api/],
// Allow /api/v1/tasks/:id/callback/:secret
pathWhiteList: [
Expand All @@ -152,6 +158,8 @@ export const apiRateLimiter = authorizationRateLimitMiddleware({
/^\/api\/v1\/sources\/http\/[^\/]+$/, // /api/v1/sources/http/$id
/^\/api\/v1\/endpoints\/[^\/]+\/[^\/]+\/index\/[^\/]+$/, // /api/v1/endpoints/$environmentId/$endpointSlug/index/$indexHookIdentifier
"/api/v1/timezones",
"/api/v1/usage/ingest",
/^\/api\/v1\/runs\/[^\/]+\/attempts$/, // /api/v1/runs/$runFriendlyId/attempts
],
log: {
rejections: env.API_RATE_LIMIT_REJECTION_LOGS_ENABLED === "1",
Expand Down
19 changes: 19 additions & 0 deletions apps/webapp/app/services/httpAsyncStorage.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { AsyncLocalStorage } from "node:async_hooks";

export type HttpLocalStorage = {
requestId: string;
path: string;
host: string;
};

const httpLocalStorage = new AsyncLocalStorage<HttpLocalStorage>();

export type RunWithHttpContextFunction = <T>(context: HttpLocalStorage, fn: () => T) => T;

export function runWithHttpContext<T>(context: HttpLocalStorage, fn: () => T): T {
return httpLocalStorage.run(context, fn);
}

export function getHttpContext(): HttpLocalStorage | undefined {
return httpLocalStorage.getStore();
}
4 changes: 3 additions & 1 deletion apps/webapp/app/services/logger.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { LogLevel } from "@trigger.dev/core-backend";
import { Logger } from "@trigger.dev/core-backend";
import { sensitiveDataReplacer } from "./sensitiveDataReplacer";
import { AsyncLocalStorage } from "async_hooks";
import { getHttpContext } from "./httpAsyncStorage.server";

const currentFieldsStore = new AsyncLocalStorage<Record<string, unknown>>();

Expand All @@ -16,7 +17,8 @@ export const logger = new Logger(
sensitiveDataReplacer,
() => {
const fields = currentFieldsStore.getStore();
return fields ? { ...fields } : {};
const httpContext = getHttpContext();
return { ...fields, http: httpContext };
}
);

Expand Down
23 changes: 18 additions & 5 deletions apps/webapp/app/services/worker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,18 @@ import { prisma } from "~/db.server";
import { env } from "~/env.server";
import { ZodWorker } from "~/platform/zodWorker.server";
import { eventRepository } from "~/v3/eventRepository.server";
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
import { reportUsageEvent } from "~/v3/openMeter.server";
import { RequeueTaskRunService } from "~/v3/requeueTaskRun.server";
import { DeliverAlertService } from "~/v3/services/alerts/deliverAlert.server";
import { PerformDeploymentAlertsService } from "~/v3/services/alerts/performDeploymentAlerts.server";
import { PerformTaskAttemptAlertsService } from "~/v3/services/alerts/performTaskAttemptAlerts.server";
import { PerformBulkActionService } from "~/v3/services/bulk/performBulkAction.server";
import { CancelTaskAttemptDependenciesService } from "~/v3/services/cancelTaskAttemptDependencies.server";
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
import { ExecuteTasksWaitingForDeployService } from "~/v3/services/executeTasksWaitingForDeploy";
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";
import { IndexDeploymentService } from "~/v3/services/indexDeployment.server";
import { ResumeBatchRunService } from "~/v3/services/resumeBatchRun.server";
import { ResumeTaskDependencyService } from "~/v3/services/resumeTaskDependency.server";
Expand Down Expand Up @@ -44,11 +50,6 @@ import { DeliverWebhookRequestService } from "./sources/deliverWebhookRequest.se
import { PerformTaskOperationService } from "./tasks/performTaskOperation.server";
import { ProcessCallbackTimeoutService } from "./tasks/processCallbackTimeout.server";
import { ResumeTaskService } from "./tasks/resumeTask.server";
import { RequeueV2Message } from "~/v3/marqs/requeueV2Message.server";
import { MarqsConcurrencyMonitor } from "~/v3/marqs/concurrencyMonitor.server";
import { reportUsageEvent } from "~/v3/openMeter.server";
import { EnqueueDelayedRunService } from "~/v3/services/enqueueDelayedRun.server";
import { ExpireEnqueuedRunService } from "~/v3/services/expireEnqueuedRun.server";

const workerCatalog = {
indexEndpoint: z.object({
Expand Down Expand Up @@ -185,6 +186,9 @@ const workerCatalog = {
"v3.expireRun": z.object({
runId: z.string(),
}),
"v3.cancelTaskAttemptDependencies": z.object({
attemptId: z.string(),
}),
};

const executionWorkerCatalog = {
Expand Down Expand Up @@ -698,6 +702,15 @@ function getWorkerQueue() {
return await service.call(payload.runId);
},
},
"v3.cancelTaskAttemptDependencies": {
priority: 0,
maxAttempts: 8,
handler: async (payload, job) => {
const service = new CancelTaskAttemptDependenciesService();

return await service.call(payload.attemptId);
},
},
},
});
}
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1411,7 +1411,9 @@ function filteredAttributes(attributes: Attributes, prefix: string): Attributes
}

function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) {
return Number(BigInt(endTime.getTime() * 1_000_000) - startTime);
const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime;

return Number(BigInt($endtime.getTime() * 1_000_000) - startTime);
}

function getNowInNanoseconds(): bigint {
Expand Down
4 changes: 4 additions & 0 deletions apps/webapp/app/v3/services/baseService.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ export abstract class BaseService {
try {
return await fn(span);
} catch (e) {
if (e instanceof ServiceValidationError) {
throw e;
}

if (e instanceof Error) {
span.recordException(e);
} else {
Expand Down
Loading
Loading