Skip to content

Commit 1281d40

Browse files
authored
v2: When a run hits the rate limit reschedule the re-execution (#1125)
* Fix: API rate limit error has the correct seconds until reset * When a v2 run hits the rate limit, reschedule using the reset timestamp * Still throw AutoYieldRateLimitErrors * Reschedule runs from the rate limit * The stress test timeout should be inside the task * If the rate limit error is thrown, don’t retry the API request
1 parent 116766f commit 1281d40

File tree

9 files changed

+121
-20
lines changed

9 files changed

+121
-20
lines changed

.changeset/strong-owls-know.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
"@trigger.dev/core": patch
4+
---
5+
6+
When a v2 run hits the rate limit, reschedule with the reset date

apps/webapp/app/services/apiRateLimit.server.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,18 @@ export function authorizationRateLimitMiddleware({
157157
}
158158

159159
res.setHeader("Content-Type", "application/problem+json");
160+
const secondsUntilReset = Math.max(0, (reset - new Date().getTime()) / 1000);
160161
return res.status(429).send(
161162
JSON.stringify(
162163
{
163164
title: "Rate Limit Exceeded",
164165
status: 429,
165166
type: "https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/429",
166-
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry after ${reset} seconds.`,
167-
reset: reset,
168-
limit: limit,
169-
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry after ${reset} seconds.`,
167+
detail: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
168+
reset,
169+
limit,
170+
secondsUntilReset,
171+
error: `Rate limit exceeded ${remaining}/${limit} requests remaining. Retry in ${secondsUntilReset} seconds.`,
170172
},
171173
null,
172174
2

apps/webapp/app/services/runs/performRunExecutionV3.server.ts

Lines changed: 51 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,10 @@ export class PerformRunExecutionV3Service {
441441
await this.#resumeAutoYieldedRunWithCompletedTask(run, safeBody.data, durationInMs);
442442
break;
443443
}
444+
case "AUTO_YIELD_RATE_LIMIT": {
445+
await this.#rescheduleRun(run, safeBody.data.reset, durationInMs);
446+
break;
447+
}
444448
case "RESUME_WITH_PARALLEL_TASK": {
445449
await this.#resumeParallelRunWithTask(run, safeBody.data, durationInMs);
446450

@@ -667,6 +671,10 @@ export class PerformRunExecutionV3Service {
667671

668672
break;
669673
}
674+
case "AUTO_YIELD_RATE_LIMIT": {
675+
await this.#rescheduleRun(run, childError.reset, durationInMs);
676+
break;
677+
}
670678
case "CANCELED": {
671679
break;
672680
}
@@ -801,9 +809,9 @@ export class PerformRunExecutionV3Service {
801809
});
802810
}
803811

804-
async #resumeAutoYieldedRun(
812+
async #rescheduleRun(
805813
run: FoundRun,
806-
data: AutoYieldMetadata,
814+
reset: number,
807815
durationInMs: number,
808816
executionCount: number = 1
809817
) {
@@ -820,24 +828,14 @@ export class PerformRunExecutionV3Service {
820828
executionCount: {
821829
increment: executionCount,
822830
},
823-
autoYieldExecution: {
824-
create: [
825-
{
826-
location: data.location,
827-
timeRemaining: data.timeRemaining,
828-
timeElapsed: data.timeElapsed,
829-
limit: data.limit ?? 0,
830-
},
831-
],
832-
},
833831
forceYieldImmediately: false,
834832
},
835833
select: {
836834
executionCount: true,
837835
},
838836
});
839837

840-
await ResumeRunService.enqueue(run, tx);
838+
await ResumeRunService.enqueue(run, tx, new Date(reset));
841839
});
842840
}
843841

@@ -888,6 +886,46 @@ export class PerformRunExecutionV3Service {
888886
});
889887
}
890888

889+
async #resumeAutoYieldedRun(
890+
run: FoundRun,
891+
data: AutoYieldMetadata,
892+
durationInMs: number,
893+
executionCount: number = 1
894+
) {
895+
await $transaction(this.#prismaClient, async (tx) => {
896+
await tx.jobRun.update({
897+
where: {
898+
id: run.id,
899+
},
900+
data: {
901+
status: "WAITING_TO_EXECUTE",
902+
executionDuration: {
903+
increment: durationInMs,
904+
},
905+
executionCount: {
906+
increment: executionCount,
907+
},
908+
autoYieldExecution: {
909+
create: [
910+
{
911+
location: data.location,
912+
timeRemaining: data.timeRemaining,
913+
timeElapsed: data.timeElapsed,
914+
limit: data.limit ?? 0,
915+
},
916+
],
917+
},
918+
forceYieldImmediately: false,
919+
},
920+
select: {
921+
executionCount: true,
922+
},
923+
});
924+
925+
await ResumeRunService.enqueue(run, tx);
926+
});
927+
}
928+
891929
async #retryRunWithTask(
892930
run: FoundRun,
893931
data: RunJobRetryWithTask,

packages/core/src/schemas/api.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -671,6 +671,13 @@ export type RunJobAutoYieldWithCompletedTaskExecutionError = z.infer<
671671
typeof RunJobAutoYieldWithCompletedTaskExecutionErrorSchema
672672
>;
673673

674+
export const RunJobAutoYieldRateLimitErrorSchema = z.object({
675+
status: z.literal("AUTO_YIELD_RATE_LIMIT"),
676+
reset: z.coerce.number(),
677+
});
678+
679+
export type RunJobAutoYieldRateLimitError = z.infer<typeof RunJobAutoYieldRateLimitErrorSchema>;
680+
674681
export const RunJobInvalidPayloadErrorSchema = z.object({
675682
status: z.literal("INVALID_PAYLOAD"),
676683
errors: z.array(SchemaErrorSchema),
@@ -719,6 +726,7 @@ export const RunJobErrorResponseSchema = z.union([
719726
RunJobAutoYieldExecutionErrorSchema,
720727
RunJobAutoYieldWithCompletedTaskExecutionErrorSchema,
721728
RunJobYieldExecutionErrorSchema,
729+
RunJobAutoYieldRateLimitErrorSchema,
722730
RunJobErrorSchema,
723731
RunJobUnresolvedAuthErrorSchema,
724732
RunJobInvalidPayloadErrorSchema,
@@ -741,6 +749,7 @@ export const RunJobResponseSchema = z.discriminatedUnion("status", [
741749
RunJobAutoYieldExecutionErrorSchema,
742750
RunJobAutoYieldWithCompletedTaskExecutionErrorSchema,
743751
RunJobYieldExecutionErrorSchema,
752+
RunJobAutoYieldRateLimitErrorSchema,
744753
RunJobErrorSchema,
745754
RunJobUnresolvedAuthErrorSchema,
746755
RunJobInvalidPayloadErrorSchema,

packages/trigger-sdk/src/apiClient.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import { env } from "node:process";
4444

4545
import { z } from "zod";
4646
import { KeyValueStoreClient } from "./store/keyValueStoreClient";
47+
import { AutoYieldRateLimitError } from "./errors";
4748

4849
export type ApiClientOptions = {
4950
apiKey?: string;
@@ -818,6 +819,15 @@ async function zodfetchWithVersions<
818819
return;
819820
}
820821

822+
//rate limit, so we want to reschedule
823+
if (response.status === 429) {
824+
//unix timestamp in milliseconds
825+
const retryAfter = response.headers.get("x-ratelimit-reset");
826+
if (retryAfter) {
827+
throw new AutoYieldRateLimitError(parseInt(retryAfter));
828+
}
829+
}
830+
821831
if (response.status >= 400 && response.status < 500) {
822832
const rawBody = await safeResponseText(response);
823833
const body = safeJsonParse(rawBody);
@@ -894,7 +904,7 @@ async function zodfetchWithVersions<
894904
body: versionedSchema.parse(jsonBody),
895905
};
896906
} catch (error) {
897-
if (error instanceof UnknownVersionError) {
907+
if (error instanceof UnknownVersionError || error instanceof AutoYieldRateLimitError) {
898908
throw error;
899909
}
900910

@@ -987,6 +997,15 @@ async function zodfetch<TResponseSchema extends z.ZodTypeAny, TOptional extends
987997
return;
988998
}
989999

1000+
//rate limit, so we want to reschedule
1001+
if (response.status === 429) {
1002+
//unix timestamp in milliseconds
1003+
const retryAfter = response.headers.get("x-ratelimit-reset");
1004+
if (retryAfter) {
1005+
throw new AutoYieldRateLimitError(parseInt(retryAfter));
1006+
}
1007+
}
1008+
9901009
if (response.status >= 400 && response.status < 500) {
9911010
const body = await response.json();
9921011

@@ -1012,6 +1031,10 @@ async function zodfetch<TResponseSchema extends z.ZodTypeAny, TOptional extends
10121031

10131032
return schema.parse(jsonBody);
10141033
} catch (error) {
1034+
if (error instanceof AutoYieldRateLimitError) {
1035+
throw error;
1036+
}
1037+
10151038
if (retryCount < MAX_RETRIES) {
10161039
// retry with exponential backoff and jitter
10171040
const delay = exponentialBackoff(retryCount + 1);

packages/trigger-sdk/src/errors.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ export class AutoYieldWithCompletedTaskExecutionError {
4545
) {}
4646
}
4747

48+
export class AutoYieldRateLimitError {
49+
constructor(public resetAtTimestamp: number) {}
50+
}
51+
4852
export class ParsedPayloadSchemaError {
4953
constructor(public schemaErrors: SchemaError[]) {}
5054
}
@@ -56,6 +60,7 @@ export type TriggerInternalError =
5660
| YieldExecutionError
5761
| AutoYieldExecutionError
5862
| AutoYieldWithCompletedTaskExecutionError
63+
| AutoYieldRateLimitError
5964
| ResumeWithParallelTaskError;
6065

6166
/** Use this function if you're using a `try/catch` block to catch errors.
@@ -72,6 +77,7 @@ export function isTriggerError(err: unknown): err is TriggerInternalError {
7277
err instanceof YieldExecutionError ||
7378
err instanceof AutoYieldExecutionError ||
7479
err instanceof AutoYieldWithCompletedTaskExecutionError ||
80+
err instanceof AutoYieldRateLimitError ||
7581
err instanceof ResumeWithParallelTaskError
7682
);
7783
}

packages/trigger-sdk/src/io.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { webcrypto } from "node:crypto";
2828
import { ApiClient } from "./apiClient";
2929
import {
3030
AutoYieldExecutionError,
31+
AutoYieldRateLimitError,
3132
AutoYieldWithCompletedTaskExecutionError,
3233
CanceledWithTaskError,
3334
ErrorWithTask,
@@ -1460,6 +1461,14 @@ export class IO {
14601461
cachedTasksCursor: this._cachedTasksCursor,
14611462
});
14621463
} catch (error) {
1464+
if (error instanceof AutoYieldRateLimitError) {
1465+
this._logger.debug("AutoYieldRateLimitError", {
1466+
error,
1467+
});
1468+
1469+
throw error;
1470+
}
1471+
14631472
return;
14641473
}
14651474
}

packages/trigger-sdk/src/triggerClient.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ import { ApiClient } from "./apiClient";
5252
import { ConcurrencyLimit, ConcurrencyLimitOptions } from "./concurrencyLimit";
5353
import {
5454
AutoYieldExecutionError,
55+
AutoYieldRateLimitError,
5556
AutoYieldWithCompletedTaskExecutionError,
5657
CanceledWithTaskError,
5758
ErrorWithTask,
@@ -1235,6 +1236,13 @@ export class TriggerClient {
12351236
};
12361237
}
12371238

1239+
if (error instanceof AutoYieldRateLimitError) {
1240+
return {
1241+
status: "AUTO_YIELD_RATE_LIMIT",
1242+
reset: error.resetAtTimestamp,
1243+
};
1244+
}
1245+
12381246
if (error instanceof YieldExecutionError) {
12391247
return { status: "YIELD_EXECUTION", key: error.key };
12401248
}

references/job-catalog/src/stressTest.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ client.defineJob({
2222
await io.runTask(
2323
`task-${i}`,
2424
async (task) => {
25+
await new Promise((resolve) => setTimeout(resolve, 2000));
26+
2527
return {
2628
output: "a".repeat(30),
2729
};
2830
},
2931
{ name: `Task ${i}` }
3032
);
31-
32-
await new Promise((resolve) => setTimeout(resolve, 2000));
3333
}
3434

3535
// Now do a wait for 5 seconds

0 commit comments

Comments
 (0)