Skip to content

v2: When a run hits the rate limit reschedule the re-execution #1125

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/strong-owls-know.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/sdk": patch
"@trigger.dev/core": patch
---

When a v2 run hits the rate limit, reschedule with the reset date
10 changes: 6 additions & 4 deletions apps/webapp/app/services/apiRateLimit.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,18 @@ export function authorizationRateLimitMiddleware({
}

res.setHeader("Content-Type", "application/problem+json");
const secondsUntilReset = Math.max(0, (reset - new Date().getTime()) / 1000);
return res.status(429).send(
JSON.stringify(
{
title: "Rate Limit Exceeded",
status: 429,
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429",
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry after ${reset} seconds.`,
reset: reset,
limit: limit,
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry after ${reset} seconds.`,
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
reset,
limit,
secondsUntilReset,
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
},
null,
2
Expand Down
64 changes: 51 additions & 13 deletions apps/webapp/app/services/runs/performRunExecutionV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,10 @@ export class PerformRunExecutionV3Service {
await this.#resumeAutoYieldedRunWithCompletedTask(run, safeBody.data, durationInMs);
break;
}
case "AUTO_YIELD_RATE_LIMIT": {
await this.#rescheduleRun(run, safeBody.data.reset, durationInMs);
break;
}
case "RESUME_WITH_PARALLEL_TASK": {
await this.#resumeParallelRunWithTask(run, safeBody.data, durationInMs);

Expand Down Expand Up @@ -667,6 +671,10 @@ export class PerformRunExecutionV3Service {

break;
}
case "AUTO_YIELD_RATE_LIMIT": {
await this.#rescheduleRun(run, childError.reset, durationInMs);
break;
}
case "CANCELED": {
break;
}
Expand Down Expand Up @@ -801,9 +809,9 @@ export class PerformRunExecutionV3Service {
});
}

async #resumeAutoYieldedRun(
async #rescheduleRun(
run: FoundRun,
data: AutoYieldMetadata,
reset: number,
durationInMs: number,
executionCount: number = 1
) {
Expand All @@ -820,24 +828,14 @@ export class PerformRunExecutionV3Service {
executionCount: {
increment: executionCount,
},
autoYieldExecution: {
create: [
{
location: data.location,
timeRemaining: data.timeRemaining,
timeElapsed: data.timeElapsed,
limit: data.limit ?? 0,
},
],
},
forceYieldImmediately: false,
},
select: {
executionCount: true,
},
});

await ResumeRunService.enqueue(run, tx);
await ResumeRunService.enqueue(run, tx, new Date(reset));
});
}

Expand Down Expand Up @@ -888,6 +886,46 @@ export class PerformRunExecutionV3Service {
});
}

async #resumeAutoYieldedRun(
run: FoundRun,
data: AutoYieldMetadata,
durationInMs: number,
executionCount: number = 1
) {
await $transaction(this.#prismaClient, async (tx) => {
await tx.jobRun.update({
where: {
id: run.id,
},
data: {
status: "WAITING_TO_EXECUTE",
executionDuration: {
increment: durationInMs,
},
executionCount: {
increment: executionCount,
},
autoYieldExecution: {
create: [
{
location: data.location,
timeRemaining: data.timeRemaining,
timeElapsed: data.timeElapsed,
limit: data.limit ?? 0,
},
],
},
forceYieldImmediately: false,
},
select: {
executionCount: true,
},
});

await ResumeRunService.enqueue(run, tx);
});
}

async #retryRunWithTask(
run: FoundRun,
data: RunJobRetryWithTask,
Expand Down
9 changes: 9 additions & 0 deletions packages/core/src/schemas/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,13 @@ export type RunJobAutoYieldWithCompletedTaskExecutionError = z.infer<
typeof RunJobAutoYieldWithCompletedTaskExecutionErrorSchema
>;

export const RunJobAutoYieldRateLimitErrorSchema = z.object({
status: z.literal("AUTO_YIELD_RATE_LIMIT"),
reset: z.coerce.number(),
});

export type RunJobAutoYieldRateLimitError = z.infer<typeof RunJobAutoYieldRateLimitErrorSchema>;

export const RunJobInvalidPayloadErrorSchema = z.object({
status: z.literal("INVALID_PAYLOAD"),
errors: z.array(SchemaErrorSchema),
Expand Down Expand Up @@ -719,6 +726,7 @@ export const RunJobErrorResponseSchema = z.union([
RunJobAutoYieldExecutionErrorSchema,
RunJobAutoYieldWithCompletedTaskExecutionErrorSchema,
RunJobYieldExecutionErrorSchema,
RunJobAutoYieldRateLimitErrorSchema,
RunJobErrorSchema,
RunJobUnresolvedAuthErrorSchema,
RunJobInvalidPayloadErrorSchema,
Expand All @@ -741,6 +749,7 @@ export const RunJobResponseSchema = z.discriminatedUnion("status", [
RunJobAutoYieldExecutionErrorSchema,
RunJobAutoYieldWithCompletedTaskExecutionErrorSchema,
RunJobYieldExecutionErrorSchema,
RunJobAutoYieldRateLimitErrorSchema,
RunJobErrorSchema,
RunJobUnresolvedAuthErrorSchema,
RunJobInvalidPayloadErrorSchema,
Expand Down
25 changes: 24 additions & 1 deletion packages/trigger-sdk/src/apiClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import { env } from "node:process";

import { z } from "zod";
import { KeyValueStoreClient } from "./store/keyValueStoreClient";
import { AutoYieldRateLimitError } from "./errors";

export type ApiClientOptions = {
apiKey?: string;
Expand Down Expand Up @@ -818,6 +819,15 @@ async function zodfetchWithVersions<
return;
}

//rate limit, so we want to reschedule
if (response.status === 429) {
//unix timestamp in milliseconds
const retryAfter = response.headers.get("x-ratelimit-reset");
if (retryAfter) {
throw new AutoYieldRateLimitError(parseInt(retryAfter));
}
}

if (response.status >= 400 && response.status < 500) {
const rawBody = await safeResponseText(response);
const body = safeJsonParse(rawBody);
Expand Down Expand Up @@ -894,7 +904,7 @@ async function zodfetchWithVersions<
body: versionedSchema.parse(jsonBody),
};
} catch (error) {
if (error instanceof UnknownVersionError) {
if (error instanceof UnknownVersionError || error instanceof AutoYieldRateLimitError) {
throw error;
}

Expand Down Expand Up @@ -987,6 +997,15 @@ async function zodfetch<TResponseSchema extends z.ZodTypeAny, TOptional extends
return;
}

//rate limit, so we want to reschedule
if (response.status === 429) {
//unix timestamp in milliseconds
const retryAfter = response.headers.get("x-ratelimit-reset");
if (retryAfter) {
throw new AutoYieldRateLimitError(parseInt(retryAfter));
}
}

if (response.status >= 400 && response.status < 500) {
const body = await response.json();

Expand All @@ -1012,6 +1031,10 @@ async function zodfetch<TResponseSchema extends z.ZodTypeAny, TOptional extends

return schema.parse(jsonBody);
} catch (error) {
if (error instanceof AutoYieldRateLimitError) {
throw error;
}

if (retryCount < MAX_RETRIES) {
// retry with exponential backoff and jitter
const delay = exponentialBackoff(retryCount + 1);
Expand Down
6 changes: 6 additions & 0 deletions packages/trigger-sdk/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ export class AutoYieldWithCompletedTaskExecutionError {
) {}
}

export class AutoYieldRateLimitError {
constructor(public resetAtTimestamp: number) {}
}

export class ParsedPayloadSchemaError {
constructor(public schemaErrors: SchemaError[]) {}
}
Expand All @@ -56,6 +60,7 @@ export type TriggerInternalError =
| YieldExecutionError
| AutoYieldExecutionError
| AutoYieldWithCompletedTaskExecutionError
| AutoYieldRateLimitError
| ResumeWithParallelTaskError;

/** Use this function if you're using a `try/catch` block to catch errors.
Expand All @@ -72,6 +77,7 @@ export function isTriggerError(err: unknown): err is TriggerInternalError {
err instanceof YieldExecutionError ||
err instanceof AutoYieldExecutionError ||
err instanceof AutoYieldWithCompletedTaskExecutionError ||
err instanceof AutoYieldRateLimitError ||
err instanceof ResumeWithParallelTaskError
);
}
Expand Down
9 changes: 9 additions & 0 deletions packages/trigger-sdk/src/io.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { webcrypto } from "node:crypto";
import { ApiClient } from "./apiClient";
import {
AutoYieldExecutionError,
AutoYieldRateLimitError,
AutoYieldWithCompletedTaskExecutionError,
CanceledWithTaskError,
ErrorWithTask,
Expand Down Expand Up @@ -1460,6 +1461,14 @@ export class IO {
cachedTasksCursor: this._cachedTasksCursor,
});
} catch (error) {
if (error instanceof AutoYieldRateLimitError) {
this._logger.debug("AutoYieldRateLimitError", {
error,
});

throw error;
}

return;
}
}
Expand Down
8 changes: 8 additions & 0 deletions packages/trigger-sdk/src/triggerClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import { ApiClient } from "./apiClient";
import { ConcurrencyLimit, ConcurrencyLimitOptions } from "./concurrencyLimit";
import {
AutoYieldExecutionError,
AutoYieldRateLimitError,
AutoYieldWithCompletedTaskExecutionError,
CanceledWithTaskError,
ErrorWithTask,
Expand Down Expand Up @@ -1235,6 +1236,13 @@ export class TriggerClient {
};
}

if (error instanceof AutoYieldRateLimitError) {
return {
status: "AUTO_YIELD_RATE_LIMIT",
reset: error.resetAtTimestamp,
};
}

if (error instanceof YieldExecutionError) {
return { status: "YIELD_EXECUTION", key: error.key };
}
Expand Down
4 changes: 2 additions & 2 deletions references/job-catalog/src/stressTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ client.defineJob({
await io.runTask(
`task-${i}`,
async (task) => {
await new Promise((resolve) => setTimeout(resolve, 2000));

return {
output: "a".repeat(30),
};
},
{ name: `Task ${i}` }
);

await new Promise((resolve) => setTimeout(resolve, 2000));
}

// Now do a wait for 5 seconds
Expand Down
Loading