Skip to content

Commit 62a0a28

Browse files
committed
Add queue size limit guard on triggering tasks
1 parent b5fcb9f commit 62a0a28

File tree

10 files changed

+98
-1
lines changed

10 files changed

+98
-1
lines changed

apps/webapp/app/env.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,9 @@ const EnvironmentSchema = z.object({
213213
TASK_PAYLOAD_OFFLOAD_THRESHOLD: z.coerce.number().int().default(524_288), // 512KB
214214
TASK_PAYLOAD_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728), // 3MB
215215
TASK_RUN_METADATA_MAXIMUM_SIZE: z.coerce.number().int().default(4_096), // 4KB
216+
217+
MAXIMUM_DEV_QUEUE_SIZE: z.coerce.number().int().optional(),
218+
MAXIMUM_DEPLOYED_QUEUE_SIZE: z.coerce.number().int().optional(),
216219
});
217220

218221
export type Environment = z.infer<typeof EnvironmentSchema>;

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,10 @@ export class MarQS {
139139
return this.redis.zcard(this.keys.queueKey(env, queue, concurrencyKey));
140140
}
141141

142+
public async lengthOfEnvQueue(env: AuthenticatedEnvironment) {
143+
return this.redis.zcard(this.keys.envQueueKey(env));
144+
}
145+
142146
public async oldestMessageInQueue(
143147
env: AuthenticatedEnvironment,
144148
queue: string,

apps/webapp/app/v3/marqs/marqsKeyProducer.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,10 @@ export class MarQSShortKeyProducer implements MarQSKeyProducer {
134134
return `${constants.ENV_PART}:${envId}:${constants.QUEUE_PART}`;
135135
}
136136

137+
envQueueKey(env: AuthenticatedEnvironment): string {
138+
return [constants.ENV_PART, this.shortId(env.id), constants.QUEUE_PART].join(":");
139+
}
140+
137141
messageKey(messageId: string) {
138142
return `${constants.MESSAGE_PART}:${messageId}`;
139143
}

apps/webapp/app/v3/marqs/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ export interface MarQSKeyProducer {
2626
envConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
2727
orgConcurrencyLimitKey(env: AuthenticatedEnvironment): string;
2828
queueKey(env: AuthenticatedEnvironment, queue: string, concurrencyKey?: string): string;
29+
envQueueKey(env: AuthenticatedEnvironment): string;
2930
envSharedQueueKey(env: AuthenticatedEnvironment): string;
3031
sharedQueueKey(): string;
3132
sharedQueueScanPattern(): string;
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
2+
import { env } from "~/env.server";
3+
import { MarQS } from "./marqs/index.server";
4+
5+
export type QueueSizeGuardResult = {
6+
isWithinLimits: boolean;
7+
maximumSize?: number;
8+
queueSize?: number;
9+
};
10+
11+
export async function guardQueueSizeLimitsForEnv(
12+
environment: AuthenticatedEnvironment,
13+
marqs?: MarQS
14+
): Promise<QueueSizeGuardResult> {
15+
const maximumSize = getMaximumSizeForEnvironment(environment);
16+
17+
if (typeof maximumSize === "undefined") {
18+
return { isWithinLimits: true };
19+
}
20+
21+
if (!marqs) {
22+
return { isWithinLimits: true, maximumSize };
23+
}
24+
25+
const queueSize = await marqs.lengthOfEnvQueue(environment);
26+
27+
return {
28+
isWithinLimits: queueSize < maximumSize,
29+
maximumSize,
30+
queueSize,
31+
};
32+
}
33+
34+
function getMaximumSizeForEnvironment(environment: AuthenticatedEnvironment): number | undefined {
35+
if (environment.type === "DEVELOPMENT") {
36+
return environment.organization.maximumDevQueueSize ?? env.MAXIMUM_DEV_QUEUE_SIZE;
37+
} else {
38+
return environment.organization.maximumDeployedQueueSize ?? env.MAXIMUM_DEPLOYED_QUEUE_SIZE;
39+
}
40+
}

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
2222
import { findCurrentWorkerFromEnvironment } from "../models/workerDeployment.server";
2323
import { handleMetadataPacket } from "~/utils/packets";
2424
import { ExpireEnqueuedRunService } from "./expireEnqueuedRun.server";
25+
import { guardQueueSizeLimitsForEnv } from "../queueSizeLimits.server";
2526

2627
export type TriggerTaskServiceOptions = {
2728
idempotencyKey?: string;
@@ -82,6 +83,24 @@ export class TriggerTaskService extends BaseService {
8283
}
8384
}
8485

86+
const queueSizeGuard = await guardQueueSizeLimitsForEnv(environment, marqs);
87+
88+
logger.debug("Queue size guard result", {
89+
queueSizeGuard,
90+
environment: {
91+
id: environment.id,
92+
type: environment.type,
93+
organization: environment.organization,
94+
project: environment.project,
95+
},
96+
});
97+
98+
if (!queueSizeGuard.isWithinLimits) {
99+
throw new ServiceValidationError(
100+
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
101+
);
102+
}
103+
85104
if (
86105
body.options?.tags &&
87106
typeof body.options.tags !== "string" &&
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- AlterTable
2+
ALTER TABLE "Organization" ADD COLUMN "maximumDeployedQueueSize" INTEGER,
3+
ADD COLUMN "maximumDevQueueSize" INTEGER;

packages/database/prisma/schema.prisma

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ model Organization {
114114
/// This is deprecated and will be removed in the future
115115
maximumSchedulesLimit Int @default(5)
116116
117+
maximumDevQueueSize Int?
118+
maximumDeployedQueueSize Int?
119+
117120
createdAt DateTime @default(now())
118121
updatedAt DateTime @updatedAt
119122
deletedAt DateTime?

references/v3-catalog/src/trigger/simple.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import "server-only";
22
import { logger, SubtaskUnwrapError, task, tasks, wait } from "@trigger.dev/sdk/v3";
33
import { traceAsync } from "@/telemetry.js";
44
import { HeaderGenerator } from "header-generator";
5+
import { setTimeout as setTimeoutP } from "node:timers/promises";
56

67
let headerGenerator = new HeaderGenerator({
78
browsers: [{ name: "firefox", minVersion: 90 }, { name: "chrome", minVersion: 110 }, "safari"],
@@ -215,3 +216,22 @@ export const retryTask = task({
215216
throw new Error("This task will always fail");
216217
},
217218
});
219+
220+
export const maximumQueueDepthParent = task({
221+
id: "maximum-queue-depth-parent",
222+
run: async (payload: any) => {
223+
await maximumQueueDepthChild.trigger({});
224+
await maximumQueueDepthChild.trigger({});
225+
await maximumQueueDepthChild.trigger({});
226+
},
227+
});
228+
229+
export const maximumQueueDepthChild = task({
230+
id: "maximum-queue-depth-child",
231+
queue: {
232+
concurrencyLimit: 1,
233+
},
234+
run: async (payload: any) => {
235+
await setTimeoutP(10_000);
236+
},
237+
});

references/v3-catalog/trigger.config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export default defineConfig({
1818
instrumentations: [new OpenAIInstrumentation()],
1919
additionalFiles: ["wrangler/wrangler.toml"],
2020
retries: {
21-
enabledInDev: true,
21+
enabledInDev: false,
2222
default: {
2323
maxAttempts: 10,
2424
minTimeoutInMs: 5_000,

0 commit comments

Comments
 (0)